这与另一个超过 3 年的问题非常相似:What's a good general way to look SQLAlchemy transactions, complete with authenticated user, etc?
我正在开发一个应用程序,我想在其中记录对特定表的所有更改。当前有一个 really good "recipe" that does versioning ,但我需要修改它以记录更改发生时的日期时间和进行更改的用户 ID。我采用了与 SQLAlchemy 打包在一起的 history_meta.py 示例,并让它记录时间而不是版本号,但我无法弄清楚如何传递用户 ID。
我上面提到的问题建议在 session 对象中包含用户 ID。这很有道理,但我不确定该怎么做。我尝试了一些简单的操作,例如 session.userid = authenticated_userid(request) 但在 history_meta.py 中,该属性似乎不再位于 session 对象上。
我在 Pyramid 框架中执行所有这些操作,我使用的 session 对象定义为 DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))。在一个 View 中,我执行 session = DBSession() 然后继续使用 session。 (我不确定这是否有必要,但这就是正在发生的事情)
这是我修改过的 history_meta.py 以防有人发现它有用:
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, DateTime
from sqlalchemy import event
from sqlalchemy.orm.properties import RelationshipProperty
from datetime import datetime
def col_references_table(col, table):
for fk in col.foreign_keys:
if fk.references(table):
return True
return False
def _history_mapper(local_mapper):
cls = local_mapper.class_
# set the "active_history" flag
# on on column-mapped attributes so that the old version
# of the info is always loaded (currently sets it on all attributes)
for prop in local_mapper.iterate_properties:
getattr(local_mapper.class_, prop.key).impl.active_history = True
super_mapper = local_mapper.inherits
super_history_mapper = getattr(cls, '__history_mapper__', None)
polymorphic_on = None
super_fks = []
if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
cols = []
for column in local_mapper.local_table.c:
if column.name == 'version_datetime':
continue
col = column.copy()
col.unique = False
if super_mapper and col_references_table(column, super_mapper.local_table):
super_fks.append((col.key, list(super_history_mapper.local_table.primary_key)[0]))
cols.append(col)
if column is local_mapper.polymorphic_on:
polymorphic_on = col
if super_mapper:
super_fks.append(('version_datetime', super_history_mapper.base_mapper.local_table.c.version_datetime))
cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))
else:
cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))
if super_fks:
cols.append(ForeignKeyConstraint(*zip(*super_fks)))
table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
*cols
)
else:
# single table inheritance. take any additional columns that may have
# been added and add them to the history table.
for column in local_mapper.local_table.c:
if column.key not in super_history_mapper.local_table.c:
col = column.copy()
col.unique = False
super_history_mapper.local_table.append_column(col)
table = None
if super_history_mapper:
bases = (super_history_mapper.class_,)
else:
bases = local_mapper.base_mapper.class_.__bases__
versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})
m = mapper(
versioned_cls,
table,
inherits=super_history_mapper,
polymorphic_on=polymorphic_on,
polymorphic_identity=local_mapper.polymorphic_identity
)
cls.__history_mapper__ = m
if not super_history_mapper:
local_mapper.local_table.append_column(
Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=False)
)
local_mapper.add_property("version_datetime", local_mapper.local_table.c.version_datetime)
class Versioned(object):
@declared_attr
def __mapper_cls__(cls):
def map(cls, *arg, **kw):
mp = mapper(cls, *arg, **kw)
_history_mapper(mp)
return mp
return map
def versioned_objects(iter):
for obj in iter:
if hasattr(obj, '__history_mapper__'):
yield obj
def create_version(obj, session, deleted = False):
obj_mapper = object_mapper(obj)
history_mapper = obj.__history_mapper__
history_cls = history_mapper.class_
obj_state = attributes.instance_state(obj)
attr = {}
obj_changed = False
for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
if hm.single:
continue
for hist_col in hm.local_table.c:
if hist_col.key == 'version_datetime':
continue
obj_col = om.local_table.c[hist_col.key]
# get the value of the
# attribute based on the MapperProperty related to the
# mapped column. this will allow usage of MapperProperties
# that have a different keyname than that of the mapped column.
try:
prop = obj_mapper.get_property_by_column(obj_col)
except UnmappedColumnError:
# in the case of single table inheritance, there may be
# columns on the mapped table intended for the subclass only.
# the "unmapped" status of the subclass column on the
# base class is a feature of the declarative module as of sqla 0.5.2.
continue
# expired object attributes and also deferred cols might not be in the
# dict. force it to load no matter what by using getattr().
if prop.key not in obj_state.dict:
getattr(obj, prop.key)
a, u, d = attributes.get_history(obj, prop.key)
if d:
attr[hist_col.key] = d[0]
obj_changed = True
elif u:
attr[hist_col.key] = u[0]
else:
# if the attribute had no value.
attr[hist_col.key] = a[0]
obj_changed = True
if not obj_changed:
# not changed, but we have relationships. OK
# check those too
for prop in obj_mapper.iterate_properties:
if isinstance(prop, RelationshipProperty) and \
attributes.get_history(obj, prop.key).has_changes():
obj_changed = True
break
if not obj_changed and not deleted:
return
attr['version_datetime'] = obj.version_datetime
hist = history_cls()
for key, value in attr.items():
setattr(hist, key, value)
session.add(hist)
print(dir(session))
obj.version_datetime = datetime.now()
def versioned_session(session):
@event.listens_for(session, 'before_flush')
def before_flush(session, flush_context, instances):
for obj in versioned_objects(session.dirty):
create_version(obj, session)
for obj in versioned_objects(session.deleted):
create_version(obj, session, deleted = True)
更新:
好的,在 before_flush() 方法中,我得到的 session 似乎是 sqlalchemy.orm.session.Session 类型,其中我将 user_id 附加到的 session 是 sqlalchemy.orm.scoping.scoped_session。所以,在某些时候,一个对象层被剥离了。将 user_id 分配给 scoped_session 中的 Session 是否安全?我可以确定它不会出现在其他请求中吗?
最佳答案
老问题,但仍然很相关。
您应该避免尝试将网络 session 信息放在数据库 session 中。它结合了不相关的问题,每个问题都有自己的生命周期(不匹配)。这是我在带有 SQLAlchemy 的 Flask 中使用的一种方法(不是 Flask-SQLAlchemy,但它也应该有效)。我试图评论 Pyramid 的不同之处。
from flask import has_request_context # How to check if in a Flask session
from sqlalchemy import inspect
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm.attributes import get_history
from sqlalchemy.event import listen
from YOUR_SESSION_MANAGER import get_user # This would be something in Pyramid
from my_project import models # Where your models are defined
def get_object_changes(obj):
""" Given a model instance, returns dict of pending
changes waiting for database flush/commit.
e.g. {
'some_field': {
'before': *SOME-VALUE*,
'after': *SOME-VALUE*
},
...
}
"""
inspection = inspect(obj)
changes = {}
for attr in class_mapper(obj.__class__).column_attrs:
if getattr(inspection.attrs, attr.key).history.has_changes():
if get_history(obj, attr.key)[2]:
before = get_history(obj, attr.key)[2].pop()
after = getattr(obj, attr.key)
if before != after:
if before or after:
changes[attr.key] = {'before': before, 'after': after}
return changes
def my_model_change_listener(mapper, connection, target):
changes = get_object_changes(target)
changes.pop("modify_ts", None) # remove fields you don't want to track
user_id = None
if has_request_context():
# Call your function to get active user and extract id
user_id = getattr(get_user(), 'id', None)
if user_id is None:
# What do you want to do if user can't be determined
pass
# You now have the model instance (target), the user_id who is logged in,
# and a dictionary of changes.
# Either do somthing "quick" with it here or call an async task (e.g.
# Celery) to do something with the information that may take longer
# than you want the request to take.
# Add the listener
listen(models.MyModel, 'after_update', my_model_change_listener)
关于python - SQLAlchemy 记录更改日期和用户,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15952733/
如何正确创建Rails迁移,以便将表更改为MySQL中的MyISAM?目前是InnoDB。运行原始执行语句会更改表,但它不会更新db/schema.rb,因此当在测试环境中重新创建表时,它会返回到InnoDB并且我的全文搜索失败。我如何着手更改/添加迁移,以便将现有表修改为MyISAM并更新schema.rb,以便我的数据库和相应的测试数据库得到相应更新? 最佳答案 我没有找到执行此操作的好方法。您可以像有人建议的那样更改您的schema.rb,然后运行:rakedb:schema:load,但是,这将覆盖您的数据。我的做法是(假设
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的Rails项目中使用Pow和powifygem。现在我尝试升级我的ruby版本(从1.9.3到2.0.0,我使用RVM)当我切换ruby版本、安装所有gem依赖项时,我通过运行railss并访问localhost:3000确保该应用程序正常运行以前,我通过使用pow访问http://my_app.dev来浏览我的应用程序。升级后,由于错误Bundler::RubyVersionMismatch:YourRubyversionis1.9.3,butyourGemfilespecified2.0.0,此url不起作用我尝试过的:重新创建pow应用程序重启pow服务器更新战俘
我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数
Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/
我尝试使用不同的ssh_options在同一阶段运行capistranov.3任务。我的production.rb说:set:stage,:productionset:user,'deploy'set:ssh_options,{user:'deploy'}通过此配置,capistrano与用户deploy连接,这对于其余的任务是正确的。但是我需要将它连接到服务器中配置良好的an_other_user以完成一项特定任务。然后我的食谱说:...taskswithoriginaluser...task:my_task_with_an_other_userdoset:user,'an_othe
我想设置一个默认日期,例如实际日期,我该如何设置?还有如何在组合框中设置默认值顺便问一下,date_field_tag和date_field之间有什么区别? 最佳答案 试试这个:将默认日期作为第二个参数传递。youcorrectlysetthedefaultvalueofcomboboxasshowninyourquestion. 关于ruby-on-rails-date_field_tag,如何设置默认日期?[rails上的ruby],我们在StackOverflow上找到一个类似的问
我需要检查DateTime是否采用有效的ISO8601格式。喜欢:#iso8601?我检查了ruby是否有特定方法,但没有找到。目前我正在使用date.iso8601==date来检查这个。有什么好的方法吗?编辑解释我的环境,并改变问题的范围。因此,我的项目将使用jsapiFullCalendar,这就是我需要iso8601字符串格式的原因。我想知道更好或正确的方法是什么,以正确的格式将日期保存在数据库中,或者让ActiveRecord完成它们的工作并在我需要时间信息时对其进行操作。 最佳答案 我不太明白你的问题。我假设您想检查
我的日期格式如下:"%d-%m-%Y"(例如,今天的日期为07-09-2015),我想看看是不是在过去的七天内。谁能推荐一种方法? 最佳答案 你可以这样做:require"date"Date.today-7 关于ruby-检查日期是否在过去7天内,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/32438063/
我有两个Rails模型,即Invoice和Invoice_details。一个Invoice_details属于Invoice,一个Invoice有多个Invoice_details。我无法使用accepts_nested_attributes_forinInvoice通过Invoice模型保存Invoice_details。我收到以下错误:(0.2ms)BEGIN(0.2ms)ROLLBACKCompleted422UnprocessableEntityin25ms(ActiveRecord:4.0ms)ActiveRecord::RecordInvalid(Validationfa