我在我的开源项目中使用 mongodb 已经一年多了,最近我决定尝试一下事务处理。在为使用事务的方法编写了一些测试后,我发现它们会抛出一些奇怪的异常,但我无法弄清楚问题出在哪里。所以我有一个使用自定义 coroutine context 和 mutex 的 method delete:
open suspend fun delete(photoInfo: PhotoInfo): Boolean {
return withContext(coroutineContext) {
return@withContext mutex.withLock {
return@withLock deletePhotoInternalInTransaction(photoInfo)
}
}
}
然后调用 method 执行一些删除操作:
//FIXME: doesn't work in tests
//should be called from within locked mutex
private suspend fun deletePhotoInternalInTransaction(photoInfo: PhotoInfo): Boolean {
check(!photoInfo.isEmpty())
val transactionMono = template.inTransaction().execute { txTemplate ->
return@execute photoInfoDao.deleteById(photoInfo.photoId, txTemplate)
.flatMap { favouritedPhotoDao.deleteFavouriteByPhotoName(photoInfo.photoName, txTemplate) }
.flatMap { reportedPhotoDao.deleteReportByPhotoName(photoInfo.photoName, txTemplate) }
.flatMap { locationMapDao.deleteById(photoInfo.photoId, txTemplate) }
.flatMap { galleryPhotoDao.deleteByPhotoName(photoInfo.photoName, txTemplate) }
}.next()
return try {
transactionMono.awaitFirst()
true
} catch (error: Throwable) {
logger.error("Could not delete photo", error)
false
}
}
这里我有五个操作,从五个不同的文档中删除数据。以下是其中一项操作的示例:
open fun deleteById(photoId: Long, template: ReactiveMongoOperations = reactiveTemplate): Mono<Boolean> {
val query = Query()
.addCriteria(Criteria.where(PhotoInfo.Mongo.Field.PHOTO_ID).`is`(photoId))
return template.remove(query, PhotoInfo::class.java)
.map { deletionResult -> deletionResult.wasAcknowledged() }
.doOnError { error -> logger.error("DB error", error) }
.onErrorReturn(false)
}
如果任一删除操作失败,我希望此操作失败,所以我使用事务。
然后我有一些 tests 用于使用此 delete 方法的处理程序:
@Test
fun `photo should not be uploaded if could not enqueue static map downloading request`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(false)
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.DatabaseError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
@Test
fun `photo should not be uploaded when resizeAndSavePhotos throws an exception`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)
Mockito.doThrow(IOException("BAM"))
.`when`(diskManipulationService).resizeAndSavePhotos(any(), any())
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.ServerResizeError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
@Test
fun `photo should not be uploaded when copyDataBuffersToFile throws an exception`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)
Mockito.doThrow(IOException("BAM"))
.`when`(diskManipulationService).copyDataBuffersToFile(Mockito.anyList(), any())
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.ServerDiskError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
通常第一个测试通过:
以下两个失败,但出现以下异常:
17:09:01.228 [Thread-17] ERROR com.kirakishou.photoexchange.database.dao.PhotoInfoDao - DB error
org.springframework.data.mongodb.UncategorizedMongoDbException: Command failed with error 24 (LockTimeout): 'Unable to acquire lock '{8368122972467948263: Database, 1450593944826866407}' within a max lock request timeout of '5ms' milliseconds.' on server 192.168.99.100:27017.
然后:
Caused by: com.mongodb.MongoCommandException: Command failed with error 246 (SnapshotUnavailable): 'Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1545661357, 23). Collection minimum is Timestamp(1545661357, 24)' on server 192.168.99.100:27017.
和:
17:22:36.951 [Thread-16] WARN reactor.core.publisher.FluxUsingWhen - Async resource cleanup failed after cancel
com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Transaction 1 has been aborted.' on server 192.168.99.100:27017.
有时其中两个通过而最后一个失败。
看起来只有第一个事务成功,任何后续事务都会失败,我想原因是我必须手动关闭它(或 ClientSession)。但我找不到有关如何关闭交易/ session 的任何信息。 Here 是我能找到的为数不多的几个例子之一,他们在这些例子中使用了带有反应模板的交易,而且我没有看到他们做任何额外的事情来关闭交易/ session 。
或者可能是因为我在模拟一个在事务中抛出异常的方法?也许在这种情况下它没有被关闭?
最佳答案
客户端 session /事务已正确关闭,但在测试中创建的索引似乎正在获取全局锁,导致下一个事务锁落后并等待锁请求超时。
基本上,您必须管理您的索引创建,这样它们就不会干扰来自客户的交易。
一个快速修复方法是通过在 shell 中运行以下命令来增加锁定超时。
db.adminCommand( { setParameter: 1, maxTransactionLockRequestTimeoutMillis: 50 } )
在生产中你可以查看事务错误标签 并重试该操作。
关于MongoDB react 模板事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53914707/
我正在使用puppet为ruby程序提供一组常量。我需要提供一组主机名,我的程序将对其进行迭代。在我之前使用的bash脚本中,我只是将它作为一个puppet变量hosts=>"host1,host2"我将其提供给bash脚本作为HOSTS=显然这对ruby不太适用——我需要它的格式hosts=["host1","host2"]自从phosts和putsmy_array.inspect提供输出["host1","host2"]我希望使用其中之一。不幸的是,我终其一生都无法弄清楚如何让它发挥作用。我尝试了以下各项:我发现某处他们指出我需要在函数调用前放置“function_”……这
我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
我正在使用Mandrill的RubyAPIGem并使用以下简单的测试模板:testastic按照Heroku指南中的示例,我有以下Ruby代码:require'mandrill'm=Mandrill::API.newrendered=m.templates.render'test-template',[{:header=>'someheadertext',:main_section=>'Themaincontentblock',:footer=>'asdf'}]mail(:to=>"JaysonLane",:subject=>"TestEmail")do|format|format.h
所以这可能有点令人困惑,但请耐心等待。简而言之,我想遍历具有特定键值的所有属性,然后如果值不为空,则将它们插入到模板中。这是我的代码:属性:#===DefaultfileConfigurations#default['elasticsearch']['default']['ES_USER']=''default['elasticsearch']['default']['ES_GROUP']=''default['elasticsearch']['default']['ES_HEAP_SIZE']=''default['elasticsearch']['default']['MAX_OP
我已经开始使用mysql2gem。我试图弄清楚一些基本的事情——其中之一是如何明确地执行事务(对于批处理操作,比如多个INSERT/UPDATE查询)。在旧的ruby-mysql中,这是我的方法:client=Mysql.real_connect(...)inserts=["INSERTINTO...","UPDATE..WHEREid=..",#etc]client.autocommit(false)inserts.eachdo|ins|beginclient.query(ins)rescue#handleerrorsorabortentirelyendendclient.commi
保存成功后可以回滚吗?让我有一个带有属性名称、电子邮件等的用户模型。例如u=User.newu.name="test_name"u.email="test@email.com"u.save现在记录将成功保存在数据库中,之后我想回滚我的事务(不是销毁或删除)。有什么想法吗? 最佳答案 您可以通过交易来做到这一点,请参阅http://markdaggett.com/blog/2011/12/01/transactions-in-rails/例子:User.transactiondoUser.create(:username=>'Nemu
一般来说,我是Middleman和ruby的新手。我已经安装了Ruby我已经安装了Middleman和gem以使其运行。我需要使用slim而不是默认的模板系统。所以我安装了Slimgem。Slim的网站只说我需要'slim'才能让它工作。中间人网站说我只需要在config.rb文件中添加模板引擎,但是没有给出例子...对于没有ruby背景的人来说,这没有帮助。我在git上找了几个config.rb,它们都有:require'slim'和#Setslim-langoutputstyleSlim::Engine.set_default_options:pretty=>true#Se
我有一个偏爱:如何将像o.office这样的值插入到属性中?value="#{o.office}"无效。 最佳答案 'type='text'/>或者你可以使用表单助手 关于ruby-on-rails-如何将变量值插入ERB模板中的HTML标签?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/6172174/
有没有办法在liquidtemplate中输出(用于调试/信息目的)可用对象和对象属性??也就是说,假设我正在使用jekyll站点生成工具,并且我在我的index.html模板中(据我所知,这是一个液体模板)。它可能看起来像这样{%forpostinsite.posts%}{{post.date|date_to_string}}»{{post.title}}{%endfor%}是否有任何我可以使用的模板标签会告诉我/输出名为post的变量在此模板(以及其他模板)中可用。此外,是否有任何模板标签可以告诉我post对象具有键date、title、url、摘录、永久链接等