BrokerController初始化的过程中,调用registerProcessor方法注册了处理器,在注册处理器的代码中可以看到创建了处理消息发送的处理器对象SendMessageProcessor,然后将其注册到远程服务中:
public class BrokerController {
// 初始化
public boolean initialize() throws CloneNotSupportedException {
// ...
// 注册处理器
this.registerProcessor();
// ...
}
// 注册处理器
public void registerProcessor() {
/**
* 发送消息处理器
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
// ...
// 注册消息发送处理器
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
// 省略其他注册...
}
}
在Broker收到生产者的发送消息请求时,会进入到SendMessageProcessor的processRequest方法中处理请求,然后又会调用asyncProcessRequest异步处理消息,然后从请求中解析请求头数据,并判断是否是批量发送消息的请求,如果是批量发送消息调用asyncSendBatchMessage方法处理,否则调用asyncSendMessage方法处理单个消息:
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
// 处理请求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = null;
try {
// 处理请求
response = asyncProcessRequest(ctx, request).get();
} catch (InterruptedException | ExecutionException e) {
log.error("process SendMessage error, request : " + request.toString(), e);
}
return response;
}
// 异步处理请求
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
// 解析请求头
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
// ...
if (requestHeader.isBatch()) {
// 批量消息发送处理
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
// 单个消息发送处理
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
// 单个消息发送处理
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
// ...
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 是否使用事务
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 事务处理
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 消息持久化
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
}
以单个消息的发送处理方法asyncSendMessage为例看一下消息的接收过程:
MessageExtBrokerInner对象,对消息的相关内容进行封装,将主题信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到MessageExtBrokerInner中brokerController的getMessageStore方法获取MessageStore对象,然后调用asyncPutMessage方法对消息进行持久化存储public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
// 单个消息发送处理
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
// ...
// 创建MessageExtBrokerInner对象,之后使用这个对象来操纵消息
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
// 设置主题
msgInner.setTopic(requestHeader.getTopic());
// 设置消息所在的队列ID
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response);
}
// 设置消息内容
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
// 设置属性
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
MessageAccessor.setProperties(msgInner, origProps);
// 设置发送消息时间
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
// 设置发送消息的主机地址
msgInner.setBornHost(ctx.channel().remoteAddress());
// 设置存储消息的主机地址
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
// 属性中添加集群名称
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
// 如果属性中包含PROPERTY_WAIT_STORE_MSG_OK
if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
// 设置消息属性
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
} else {
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
}
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 是否使用事务
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 事务处理
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 消息写入
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
// 返回消息持久化结果
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
}
MessageStore是一个接口,在BrokerController的初始化方法中可以看到,具体使用的是DefaultMessageStore:
public class BrokerController {
private MessageStore messageStore;
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();
// ...
if (result) {
try {
// 创建DefaultMessageStore
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
// ...
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
// 获取MessageStore
public MessageStore getMessageStore() {
return messageStore;
}
}
DefaultMessageStore中有一个CommitLog类型的成员变量,在DefaultMessageStore中的构造函数中可以看到,如果启用了Dleger,使用的是DLedgerCommitLog,DLedgerCommitLog是CommitLog的子类,如果未启用Dleger,就使用CommitLog自己(接下来会以CommitLog为例)。
在DefaultMessageStore的asyncPutMessage方法中,首先进行了一系列的合法性校验,校验通过后会调用CommitLog的asyncPutMessage进行消息写入:
public class DefaultMessageStore implements MessageStore {
private final CommitLog commitLog; // CommitLog
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// ...
// 如果启用了Dleger
if (messageStoreConfig.isEnableDLegerCommitLog()) {
// 使用DLedgerCommitLog
this.commitLog = new DLedgerCommitLog(this);
} else {
// 否则使用CommitLog
this.commitLog = new CommitLog(this);
}
// ...
}
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
// 校验存储状态
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
// 校验消息合法性
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
// 进行一系列校验
PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {
return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));
}
long beginTime = this.getSystemClock().now();
// 调用CommitLog的asyncPutMessage方法写入消息
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});
return putResultFuture;
}
}
checkStoreStatus主要对Broker是否可以写入消息进行检查,包含以下几个方面:
MessageStore是否已经处于关闭状态,如果处于关闭状态不再受理消息的存储 private PutMessageStatus checkStoreStatus() {
// 是否处于停止状态
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
// 是否SLAVE角色
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("broke role is slave, so putMessage is forbidden");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
// 是否可写
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
"the broker's disk is full, write to logic queue error, write to index file error, etc");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} else {
this.printTimes.set(0);
}
// 操作系统是否处于PAGECACHE繁忙状态
if (this.isOSPageCacheBusy()) {
return PutMessageStatus.OS_PAGECACHE_BUSY;
}
return PutMessageStatus.PUT_OK;
}
checkMessage方法主要是对主题的长度校验和消息属性的长度校验:
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
// 如果主题的长度大于最大值
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
// 如果消息属性长度大于最大值
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
return PutMessageStatus.PUT_OK;
}
checkLmqMessage主要判断在开启LMQ(Light Message Queue)时是否超过了最大消费数量:
private PutMessageStatus checkLmqMessage(MessageExtBrokerInner msg) {
// 如果消息属性不为空、存在PROPERTY_INNER_MULTI_DISPATCH属性、并且超过了最大消费数量
if (msg.getProperties() != null
&& StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
&& this.isLmqConsumeQueueNumExceeded()) {
return PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED;
}
return PutMessageStatus.PUT_OK;
}
private boolean isLmqConsumeQueueNumExceeded() {
// 开启了LMQ && 开启了多个队列分发 && 消费数量大于了限定值
if (this.getMessageStoreConfig().isEnableLmq() && this.getMessageStoreConfig().isEnableMultiDispatch()
&& this.lmqConsumeQueueNum.get() > this.messageStoreConfig.getMaxLmqConsumeQueueNum()) {
return true;
}
return false;
}
对消息进行校验完毕之后,调用了CommitLog的asyncPutMessage进行消息写入,为了简单起见,这里我们先不考虑事务,处理流程如下:
首先对消息的相关属性进行了设置,主要包括以下内容
获取当前线程绑定的PutMessageThreadLocal对象,里面有一个MessageExtEncoder类型的成员变量,调用它的encode方法可以对消息进行编码,将数据先写入内存buffer,然后调用MessageExtBrokerInner的setEncodedBuff方法将buffer设置到encodedBuff中
加锁,从mappedFileQueue中获取上一次使用的映射文件mappedFile,并更新消息的存储时间, 如果mappedFile为空或者已写满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已达到规定的大小,需要新建一个文件,如果新建文件为空打印错误日志并返回结果
mappedFile可以看做是每一个Commitlog文件的映射对象,Commitlog文件的大小限定为1G
mappedFileQueue是所有mappedFile的集合,可以理解为CommitLog文件所在的目录
调用mappedFile的appendMessage方法向文件中追加消息数据,在调用方法时传入了回调函数appendMessageCallback,在CommitLog的构造函数中可以看到是DefaultAppendMessageCallback类型的,所以会进入到DefaultAppendMessageCallback中进行消息写入,如果写入成功,数据会留在操作系统的PAGECACHE中
调用submitFlushRequest方法执行刷盘策略,判断是否需要立刻将PAGECACHE中的数据刷到磁盘
public class CommitLog {
// 所有mappedFile集合
protected final MappedFileQueue mappedFileQueue;
// ThreadLocal
private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
// 写入消息的回调函数
private final AppendMessageCallback appendMessageCallback;
public CommitLog(final DefaultMessageStore defaultMessageStore) { // 构造函数
//...
// 创建回调函数
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
//...
}
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 设置存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// 设置消息的CRC值
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// 写入结果
AppendMessageResult result = null;
// 获取存储统计服务
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// 获取主题
String topic = msg.getTopic();
// 获取事务类型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 省略事务相关处理
}
// 获取发送消息的主机地址
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) { // 如果是IPV6
msg.setBornHostV6Flag(); // 设置IPV6标识
}
// 获取存储消息的主机地址
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag(); // 设置IPV6标识
}
// 获取当前线程绑定的PutMessageThreadLocal对象
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
// 调用encode方法对消息进行编码,并写入buffer
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
// 将存储编码消息的buffer设置到msg中
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
// 创建PutMessageContext
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 加锁
putMessageLock.lock();
try {
// 获取上一次写入的文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 获取系统时间戳
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// 再次更新存储时间戳,保证全局顺序
msg.setStoreTimestamp(beginLockTimestamp);
// 如果mapppedFile为空或者已满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已满,需要新建一个文件
if (null == mappedFile || mappedFile.isFull()) {
// 使用偏移量0创建一个新的文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
// 如果依旧为空
if (null == mappedFile) {
// 提示错误
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
// 写入消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
// ...
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
} finally {
beginTimeInLock = 0;
putMessageLock.unlock();
}
// ...
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 统计相关
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
// 执行刷盘
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
// 返回结果
return putMessageResult;
});
}
}
MessageExtEncoder是CommitLog的一个内部类,它被CommitLog的另外一个内部类PutMessageThreadLocal所引用,ThreadLocal一般用于多线程环境下,为每个线程创建自己的副本变量,从而互不影响,PutMessageThreadLocal在构造函数中对MessageExtEncoder进行了实例化,并指定了创建缓冲区的大小:
public class CommitLog {
// ThreadLocal
private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
// 添加消息的ThreadLocal对象
static class PutMessageThreadLocal {
private MessageExtEncoder encoder; // 引用MessageExtEncoder
private StringBuilder keyBuilder;
PutMessageThreadLocal(int size) {
// 创建MessageExtEncoder,size用来指定分配内存的大小
encoder = new MessageExtEncoder(size);
keyBuilder = new StringBuilder();
}
// ...
}
}
MessageExtEncoder中使用了ByteBuffer作为消息内容存放的缓冲区,上面可知缓冲区的大小是在PutMessageThreadLocal的构造函数中指定的,MessageExtEncoder的encode方法中对消息进了编码并将数据写入分配的缓冲区:
public class CommitLog {
// MessageExtEncoder
public static class MessageExtEncoder {
// 字节缓冲区,存储消息内容的buffer
private final ByteBuffer encoderBuffer;
MessageExtEncoder(final int size) {
// 分配内存
this.encoderBuffer = ByteBuffer.allocateDirect(size);
this.maxMessageSize = size;
}
// 对消息进行编码并写入buffer
protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
// 消息属性数据
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
// 属性数据长度
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
// 校验长度是否超过最大值
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
// 获取主题数据
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;// 主题数据长度
// 获取消息体内容长度
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
// 总消息内容长度
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// 是否超过最大长度限制
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// 初始化
this.resetByteBuffer(encoderBuffer, msgLen);
// 1 写入消息长度
this.encoderBuffer.putInt(msgLen);
// 2 写入魔数
this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 写入消息体CRC校验和
this.encoderBuffer.putInt(msgInner.getBodyCRC());
// 4 写入队列ID
this.encoderBuffer.putInt(msgInner.getQueueId());
// 5 写入标识
this.encoderBuffer.putInt(msgInner.getFlag());
// 6 队列的偏移量, 稍后写入
this.encoderBuffer.putLong(0);
// 7 文件的物理偏移量, 稍后写入
this.encoderBuffer.putLong(0);
// 8 写入系统标识
this.encoderBuffer.putInt(msgInner.getSysFlag());
// 9 写入发送消息的时间戳
this.encoderBuffer.putLong(msgInner.getBornTimestamp());
// 10 写入发送消息的主机地址
socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
// 11 写入存储时间戳
this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
// 12 写入存储消息的主机地址
socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
// 13 RECONSUMETIMES
this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
// 15 写入消息体长度
this.encoderBuffer.putInt(bodyLength);
if (bodyLength > 0)
this.encoderBuffer.put(msgInner.getBody());// 写入消息内容
// 16 写入主题长度
this.encoderBuffer.put((byte) topicLength);
// 写入主题
this.encoderBuffer.put(topicData);
// 17 写入属性长度
this.encoderBuffer.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.encoderBuffer.put(propertiesData); // 写入属性数据
encoderBuffer.flip();
return null;
}
}
}
前面提到MappedFile可以看做是每一个Commitlog文件的映射,里面记录了文件的大小以及数据已经写入的位置,还有两个字节缓冲区ByteBuffer和MappedByteBuffer,它们的继承关系如下:

ByteBuffer:字节缓冲区,用于在内存中分配空间,可以在JVM堆中分配内存(HeapByteBuffer),也可以在堆外分配内存(DirectByteBuffer)。
MappedByteBuffer:是ByteBuffer的子类,它是将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,也叫文件映射,可以减少数据的拷贝。
MappedFile提供了两种方式来进行内容的写入,对应不同的init方法:
第一种通过ByteBuffer分配缓冲区并将内容写入缓冲区,并且使用了暂存池对内存进行管理,需要时进行申请,使用完毕后回收,类似于数据库连接池。
第二种是通过MappedByteBuffer,对CommitLog进行文件映射,然后进行消息写入。
综上所述,开启使用暂存池时会使用ByteBuffer,否则使用MappedByteBuffer进行内容写入。
public class MappedFile extends ReferenceResource {
// 记录文件的写入位置
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 文件大小
protected int fileSize;
// 字节buffer
protected ByteBuffer writeBuffer = null;
// 文件映射
private MappedByteBuffer mappedByteBuffer;
// 暂存池,类似线程池,只不过池中存放的是申请的内存
protected TransientStorePool transientStorePool = null;
// 初始化
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
// 从暂存池中获取一块内存
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
// 初始化
private void init(final String fileName, final int fileSize) throws IOException {
// ...
try {
// 获取文件
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// 进行文件映射
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
// ...
} catch (IOException e) {
// ...
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
}
// 暂存池
public class TransientStorePool {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final int poolSize; // 暂存池大小
private final int fileSize; // 申请的每一块内存大小
private final Deque<ByteBuffer> availableBuffers; // 双端队列,存放申请的内存
private final MessageStoreConfig storeConfig; // 存储配置
public TransientStorePool(final MessageStoreConfig storeConfig) {
this.storeConfig = storeConfig;
this.poolSize = storeConfig.getTransientStorePoolSize();
this.fileSize = storeConfig.getMappedFileSizeCommitLog();
this.availableBuffers = new ConcurrentLinkedDeque<>();
}
/**
* 初始化
*/
public void init() {
// 根据暂存池大小申请内存
for (int i = 0; i < poolSize; i++) {
// 申请直接内存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
// 放入到暂存池中
availableBuffers.offer(byteBuffer);
}
}
}
经过之前的步骤,消息内容已经写入到内存缓冲区中,并且也知道准备进行写入的CommitLog对应的映射文件,接下来就可以调用MappedFile的appendMessagesInner方法将内存中的内容写入映射文件,处理逻辑如下:
MappedFile中记录了文件的写入位置,获取准备写入的位置,如果写入的位置小于文件大小,意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息
如果writeBuffer不为空,使用writeBuffer,否则使用mappedByteBuffer的slice方法创建一个与MappedFile共享的内存区byteBuffer,设置byteBuffer的写入位置,之后通过byteBuffer来进行消息写入,由于是共享内存区域,所以写入的内容会影响到writeBuffer或者mappedByteBuffer中

调用回调函数的doAppend方法进行写入,前面可知回调函数是DefaultAppendMessageCallback类型的
更新MappedFile写入位置,返回写入结果
public class MappedFile extends ReferenceResource {
// 记录文件的写入位置
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 文件大小
protected int fileSize;
// 字节buffer
protected ByteBuffer writeBuffer = null;
// 文件映射
private MappedByteBuffer mappedByteBuffer;
// 写入消息
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
// 调用appendMessagesInner
return appendMessagesInner(msg, cb, putMessageContext);
}
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
// 获取写入位置
int currentPos = this.wrotePosition.get();
// 如果写指针小于文件大小
if (currentPos < this.fileSize) {
// 如果writeBuffer不为空,使用writeBuffer的slice方法创建共享内存区,否则使用mappedByteBuffer
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 设置共享内存区的写入位置
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) { // 单个消息处理
// 通过共享内存区byteBuffer写入数据
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBatch) { // 批量消息
// 通过共享内存区byteBuffer写入数据
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 更新MappedFile的写入位置
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
}
进入到DefaultAppendMessageCallback的doAppend方法中,首先来看方法的入参:
方法的处理逻辑如下:
计算文件要写入位置偏移量:文件起始位置偏移量 + 准备写入位置的偏移量
从消息写入上下文中获取主题所属队列的KEY,根据KEY从主题队列路由表中获取队列偏移量,如果获取为空,将偏移量初始化为0并加入到路由表中
从msgInner中获取之前已经写入到内存的消息数据preEncodeBuffer,并获取消息内容的长度
校验是否有足够的空间写入数据,如果消息长度 + END_FILE_MIN_BLANK_LENGTH(预留空间大小) 大于剩余空间,说明超出了限定的文件大小,此时只将文件大小和魔数写入文件,然后返回写入结果,结果类型为END_OF_FILE(超过文件大小)。
这里可以看出每个CommitLog文件需要预留一部分空间(8个字节)用于存储文件大小和魔数。
计算队列偏移量在preEncodeBuffer中的位置,之前在编码消息步骤时并未写入队列的偏移量值的大小,这里需要找到对应位置更新队列偏移量的值
再次更新消息的存储时间,并将preEncodeBuffer的内容写入文件共享缓冲区byteBuffer,**此时消息内容已经写入文件对应的内存buffer中,驻留在操作系统的PAGECACHE中,接下来需要根据刷盘策略决定何时将内容保存到硬盘中。 **
消息写入结果
public class CommitLog {
class DefaultAppendMessageCallback implements AppendMessageCallback {
// 预留空间大小,8个字节
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// 计算写入位置物理偏移量:文件起始位置 + 准备写入位置的偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
// 获取消息队列信息
String key = putMessageContext.getTopicQueueTableKey();
// 从主题队列路由表中获取队列偏移量
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
// 如果偏移量为空
if (null == queueOffset) {
queueOffset = 0L; // 初始化为0
// 添加到路由表中
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
if (!multiDispatchWrapResult) {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 如果开启事务需要特殊处理
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
// ...
// 获取之前已经写入到buffer的消息数据
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
// 获取数据长度
final int msgLen = preEncodeBuffer.getInt(0);
// 校验是否有足够的空间写入数据,如果消息长度 + 预留空间大小 大于最大值
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.msgStoreItemMemory.clear();
// 1 设置文件大小
this.msgStoreItemMemory.putInt(maxBlank);
// 2 写入魔数
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 开始时间
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 将文件大小和魔数写入buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
// 返回写入结果,由于剩余空间不足以写入消息内容,这里返回类型为END_OF_FILE
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
msgIdSupplier, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// 计算队列偏移量的位置
int pos = 4 + 4 + 4 + 4 + 4;
// 6 写入队列偏移量
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
// 7 写入物理偏移量
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 8 系统标识, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos += 8 + 4 + 8 + ipLen; // 计算存储时间戳的写入位置
// 更新新存储时间戳
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// 将preEncodeBuffer的数据写入byteBuffer
byteBuffer.put(preEncodeBuffer);
// 清空buffer
msgInner.setEncodedBuff(null);
// 设置返回结果
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
break;
default:
break;
}
return result;
}
}
}
由于篇幅原因,刷盘机制将另写一篇文章。
总结

参考
丁威、周继锋《RocketMQ技术内幕》
https://github.com/apache/rocketmq/blob/develop/docs/cn/Example_LMQ.md
RocketMQ版本:4.9.3
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
我正在编写一个简单的静态Rack应用程序。查看下面的config.ru代码:useRack::Static,:urls=>["/elements","/img","/pages","/users","/css","/js"],:root=>"archive"map'/'dorunProc.new{|env|[200,{'Content-Type'=>'text/html','Cache-Control'=>'public,max-age=6400'},File.open('archive/splash.html',File::RDONLY)]}endmap'/pages/search.
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi
如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]
RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)
我去了这个website查看Rails5.0.0和Rails5.1.1之间的区别为什么5.1.1不再包含:config/initializers/session_store.rb?谢谢 最佳答案 这是删除它的提交:Setupdefaultsessionstoreinternally,nolongerthroughanapplicationinitializer总而言之,新应用没有该初始化器,session存储默认设置为cookie存储。即与在该初始值设定项的生成版本中指定的值相同。 关于
我正在关注Hartl的railstutorial.org并已到达11.4.4:Imageuploadinproduction.我做了什么:注册亚马逊网络服务在AmazonIdentityandAccessManagement中,我创建了一个用户。用户创建成功。在AmazonS3中,我创建了一个新存储桶。设置新存储桶的权限:权限:本教程指示“授予上一步创建的用户读写权限”。但是,在存储桶的“权限”下,未提及新用户名。我只能在每个人、经过身份验证的用户、日志传送、我和亚马逊似乎根据我的名字+数字创建的用户名之间进行选择。我已经通过选择经过身份验证的用户并选中了上传/删除和查看权限的框(而不
我正在使用mechanize登录网站,然后检索页面。我遇到了一些问题,我怀疑这是由于cookie中的某些值造成的。当Mechanize登录网站时,我假设它存储了cookie。如何通过Mechanize打印出存储在cookie中的所有数据? 最佳答案 代理有一个cookie方法。agent=Mechanize.newpage=agent.get("http://www.google.com/")agent.cookiesagent.cookies.to_scookie返回一个Mechanize::Cookiesobject