草庐IT

python - 开始使用 Python 进行安全的 AWS CloudFront 流传输

coder 2023-05-22 原文

我已经创建了一个 S3 存储桶,上传了一个视频,在 CloudFront 中创建了一个流分配。使用静态 HTML 播放器对其进行了测试,并且可以正常工作。我通过帐户设置创建了一个 key 对。目前我的桌面上有私钥文件。那就是我所在的地方。

我的目标是让我的 Django/Python 站点创建安全的 URL,并且人们无法访问视频,除非他们来自我的一个页面。问题是我对亚马逊的布局方式过敏,而且我越来越困惑。

我意识到这不是 StackOverflow 上最好的问题,但我敢肯定,我不能成为这里唯一一个无法对如何设置安全的 CloudFront/S3 情况做出正面或反面的傻瓜。我非常感谢您的帮助,并愿意(两天过去后)对最佳答案给予 500pt 赏金。

我有几个问题,一旦得到回答,就应该符合如何完成我所追求的一个解释:

  • 在文档中(下一点有一个例子)有很多 XML 告诉我我需要 POST东西到各个地方。是否有用于执行此操作的在线控制台?或者我真的必须通过 cURL (et al) 强制执行此操作?
  • 如何为 CloudFront 创建源访问身份并将其绑定(bind)到我的分配?我已阅读 this document但是,根据第一点,不知道如何处理它。我的 key 对如何适应这个?
  • 完成后,如何将 S3 存储桶限制为仅允许人们通过该身份下载内容?如果这是另一个 XML 作业,而不是在 Web UI 上单击,请告诉我应该在哪里以及如何将其放入我的帐户。
  • 在 Python 中,为文件生成过期 URL 的最简单方法是什么。我有 boto已安装,但我不知道如何从流分发中获取文件。
  • 是否有任何应用程序或脚本可以解决设置此问题的困难?我使用 Ubuntu (Linux),但如果它仅适用于 Windows,那么我在 VM 中有 XP。我已经看过 CloudBerry S3 Explorer Pro - 但它和在线 UI 一样有意义。
  • 最佳答案

    你是对的,需要大量的 API 工作来完成这个设置。我希望他们尽快在 AWS 控制台中获得它!

    更新:我已将此代码提交给 boto - 从 boto v2.1(2011-10-27 发布)开始,这变得容易多了。对于 boto < 2.1,请使用此处的说明。对于="" boto="" 2.1="">http://www.secretmike.com/2011/10/aws-cloudfront-secure-streaming.html一旦 boto v2.1 被更多发行版打包,我将在这里更新答案。

    要完成您想要的操作,您需要执行以下步骤,我将在下面详细介绍:

  • 创建您的 s3 存储桶并上传一些对象(您已经完成了)
  • 创建一个 Cloudfront“源访问身份”(基本上是一个 AWS 帐户,允许 cloudfront 访问您的 s3 存储桶)
  • 修改对象上的 ACL,以便仅允许您的 Cloudfront Origin Access Identity 读取它们(这可以防止人们绕过 Cloudfront 并直接进入 s3)
  • 创建一个带有基本 URL 和一个需要签名 URL 的 cloudfront 分发
  • 测试您可以从基本的 cloudfront 发行版下载对象,但不能从 s3 或签名的 cloudfront 发行版下载对象
  • 创建用于签名 URL 的 key 对
  • 使用 Python 生成一些 URL
  • 测试签名 URL 是否有效


  • 1 - 创建 Bucket 并上传对象

    最简单的方法是通过 AWS 控制台,但为了完整起见,我将展示如何使用 boto。 Boto 代码如下所示:
    import boto
    
    #credentials stored in environment AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
    s3 = boto.connect_s3()
    
    #bucket name MUST follow dns guidelines
    new_bucket_name = "stream.example.com"
    bucket = s3.create_bucket(new_bucket_name)
    
    object_name = "video.mp4"
    key = bucket.new_key(object_name)
    key.set_contents_from_filename(object_name)
    

    2 - 创建 Cloudfront“源访问身份”

    目前,此步骤只能使用 API 执行。博托代码在这里:
    import boto
    
    #credentials stored in environment AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
    cf = boto.connect_cloudfront()
    
    oai = cf.create_origin_access_identity(comment='New identity for secure videos')
    
    #We need the following two values for later steps:
    print("Origin Access Identity ID: %s" % oai.id)
    print("Origin Access Identity S3CanonicalUserId: %s" % oai.s3_user_id)
    

    3 - 修改对象上的 ACL

    现在我们已经有了我们的特殊 S3 用户帐户(我们在上面创建的 S3CanonicalUserId),我们需要让它访问我们的 s3 对象。我们可以使用 AWS 控制台轻松完成此操作,方法是打开对象的(不是存储桶的!)权限选项卡,单击“添加更多权限”按钮,然后将我们在上面获得的很长的 S3CanonicalUserId 粘贴到新的“被授予者”字段中。确保您授予新权限“打开/下载”权限。

    您还可以使用以下 boto 脚本在代码中执行此操作:
    import boto
    
    #credentials stored in environment AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
    s3 = boto.connect_s3()
    
    bucket_name = "stream.example.com"
    bucket = s3.get_bucket(bucket_name)
    
    object_name = "video.mp4"
    key = bucket.get_key(object_name)
    
    #Now add read permission to our new s3 account
    s3_canonical_user_id = "<your S3CanonicalUserID from above>"
    key.add_user_grant("READ", s3_canonical_user_id)
    

    4 - 创建云前端分发

    请注意,在撰写本文时尚未正式发布的 2.0 版之前,boto 并不完全支持自定义来源和私有(private)发行版。下面的代码从 boto 2.0 分支中提取了一些代码并将其组合在一起以使其运行,但它并不漂亮。 2.0 分支更优雅地处理了这个问题——如果可能的话,一定要使用它!
    import boto
    from boto.cloudfront.distribution import DistributionConfig
    from boto.cloudfront.exception import CloudFrontServerError
    
    import re
    
    def get_domain_from_xml(xml):
        results = re.findall("<DomainName>([^<]+)</DomainName>", xml)
        return results[0]
    
    #custom class to hack this until boto v2.0 is released
    class HackedStreamingDistributionConfig(DistributionConfig):
    
        def __init__(self, connection=None, origin='', enabled=False,
                     caller_reference='', cnames=None, comment='',
                     trusted_signers=None):
            DistributionConfig.__init__(self, connection=connection,
                                        origin=origin, enabled=enabled,
                                        caller_reference=caller_reference,
                                        cnames=cnames, comment=comment,
                                        trusted_signers=trusted_signers)
    
        #override the to_xml() function
        def to_xml(self):
            s = '<?xml version="1.0" encoding="UTF-8"?>\n'
            s += '<StreamingDistributionConfig xmlns="http://cloudfront.amazonaws.com/doc/2010-07-15/">\n'
    
            s += '  <S3Origin>\n'
            s += '    <DNSName>%s</DNSName>\n' % self.origin
            if self.origin_access_identity:
                val = self.origin_access_identity
                s += '    <OriginAccessIdentity>origin-access-identity/cloudfront/%s</OriginAccessIdentity>\n' % val
            s += '  </S3Origin>\n'
    
    
            s += '  <CallerReference>%s</CallerReference>\n' % self.caller_reference
            for cname in self.cnames:
                s += '  <CNAME>%s</CNAME>\n' % cname
            if self.comment:
                s += '  <Comment>%s</Comment>\n' % self.comment
            s += '  <Enabled>'
            if self.enabled:
                s += 'true'
            else:
                s += 'false'
            s += '</Enabled>\n'
            if self.trusted_signers:
                s += '<TrustedSigners>\n'
                for signer in self.trusted_signers:
                    if signer == 'Self':
                        s += '  <Self/>\n'
                    else:
                        s += '  <AwsAccountNumber>%s</AwsAccountNumber>\n' % signer
                s += '</TrustedSigners>\n'
            if self.logging:
                s += '<Logging>\n'
                s += '  <Bucket>%s</Bucket>\n' % self.logging.bucket
                s += '  <Prefix>%s</Prefix>\n' % self.logging.prefix
                s += '</Logging>\n'
            s += '</StreamingDistributionConfig>\n'
    
            return s
    
        def create(self):
            response = self.connection.make_request('POST',
                '/%s/%s' % ("2010-11-01", "streaming-distribution"),
                {'Content-Type' : 'text/xml'},
                data=self.to_xml())
    
            body = response.read()
            if response.status == 201:
                return body
            else:
                raise CloudFrontServerError(response.status, response.reason, body)
    
    
    cf = boto.connect_cloudfront()
    
    s3_dns_name = "stream.example.com.s3.amazonaws.com"
    comment = "example streaming distribution"
    oai = "<OAI ID from step 2 above like E23KRHS6GDUF5L>"
    
    #Create a distribution that does NOT need signed URLS
    hsd = HackedStreamingDistributionConfig(connection=cf, origin=s3_dns_name, comment=comment, enabled=True)
    hsd.origin_access_identity = oai
    basic_dist = hsd.create()
    print("Distribution with basic URLs: %s" % get_domain_from_xml(basic_dist))
    
    #Create a distribution that DOES need signed URLS
    hsd = HackedStreamingDistributionConfig(connection=cf, origin=s3_dns_name, comment=comment, enabled=True)
    hsd.origin_access_identity = oai
    #Add some required signers (Self means your own account)
    hsd.trusted_signers = ['Self']
    signed_dist = hsd.create()
    print("Distribution with signed URLs: %s" % get_domain_from_xml(signed_dist))
    

    5 - 测试您可以从 cloudfront 下载对象,但不能从 s3

    您现在应该能够验证:
  • stream.example.com.s3.amazonaws.com/video.mp4 - 应该给 AccessDenied
  • signed_distribution.cloudfront.net/video.mp4 - 应该给出 MissingKey(因为 URL 没有签名)
  • basic_distribution.cloudfront.net/video.mp4 - 应该可以正常工作

  • 必须调整测试以与您的流播放器一起使用,但基本思想是只有基本的 cloudfront url 应该工作。

    6 - 为 CloudFront 创建 key 对

    我认为唯一的方法是通过亚马逊的网站。进入您的 AWS“帐户”页面并单击“安全凭证”链接。单击“ key 对”选项卡,然后单击“创建新的 key 对”。这将为您生成一个新的 key 对并自动下载一个私钥文件(pk-xxxxxxxxx.pem)。保持 key 文件的安全和私密。还要记下来自亚马逊的“ key 对 ID”,因为我们将在下一步中需要它。

    7 - 在 Python 中生成一些 URL

    从 boto 2.0 版开始,似乎不支持生成签名的 CloudFront URL。 Python 在标准库中不包含 RSA 加密例程,因此我们将不得不使用额外的库。我在这个例子中使用了 M2Crypto。

    对于非流式分发,您必须使用完整的 cloudfront URL 作为资源,但是对于流式传输,我们仅使用视频文件的对象名称。有关生成仅持续 5 分钟的 URL 的完整示例,请参阅下面的代码。

    此代码松散地基于 Amazon 在 CloudFront 文档中提供的 PHP 示例代码。
    from M2Crypto import EVP
    import base64
    import time
    
    def aws_url_base64_encode(msg):
        msg_base64 = base64.b64encode(msg)
        msg_base64 = msg_base64.replace('+', '-')
        msg_base64 = msg_base64.replace('=', '_')
        msg_base64 = msg_base64.replace('/', '~')
        return msg_base64
    
    def sign_string(message, priv_key_string):
        key = EVP.load_key_string(priv_key_string)
        key.reset_context(md='sha1')
        key.sign_init()
        key.sign_update(str(message))
        signature = key.sign_final()
        return signature
    
    def create_url(url, encoded_signature, key_pair_id, expires):
        signed_url = "%(url)s?Expires=%(expires)s&Signature=%(encoded_signature)s&Key-Pair-Id=%(key_pair_id)s" % {
                'url':url,
                'expires':expires,
                'encoded_signature':encoded_signature,
                'key_pair_id':key_pair_id,
                }
        return signed_url
    
    def get_canned_policy_url(url, priv_key_string, key_pair_id, expires):
        #we manually construct this policy string to ensure formatting matches signature
        canned_policy = '{"Statement":[{"Resource":"%(url)s","Condition":{"DateLessThan":{"AWS:EpochTime":%(expires)s}}}]}' % {'url':url, 'expires':expires}
    
        #now base64 encode it (must be URL safe)
        encoded_policy = aws_url_base64_encode(canned_policy)
        #sign the non-encoded policy
        signature = sign_string(canned_policy, priv_key_string)
        #now base64 encode the signature (URL safe as well)
        encoded_signature = aws_url_base64_encode(signature)
    
        #combine these into a full url
        signed_url = create_url(url, encoded_signature, key_pair_id, expires);
    
        return signed_url
    
    def encode_query_param(resource):
        enc = resource
        enc = enc.replace('?', '%3F')
        enc = enc.replace('=', '%3D')
        enc = enc.replace('&', '%26')
        return enc
    
    
    #Set parameters for URL
    key_pair_id = "APKAIAZCZRKVIO4BQ" #from the AWS accounts page
    priv_key_file = "cloudfront-pk.pem" #your private keypair file
    resource = 'video.mp4' #your resource (just object name for streaming videos)
    expires = int(time.time()) + 300 #5 min
    
    #Create the signed URL
    priv_key_string = open(priv_key_file).read()
    signed_url = get_canned_policy_url(resource, priv_key_string, key_pair_id, expires)
    
    #Flash player doesn't like query params so encode them
    enc_url = encode_query_param(signed_url)
    print(enc_url)
    

    8 - 尝试 URL

    希望你现在应该有一个看起来像这样的工作 URL:
    video.mp4%3FExpires%3D1309979985%26Signature%3DMUNF7pw1689FhMeSN6JzQmWNVxcaIE9mk1x~KOudJky7anTuX0oAgL~1GW-ON6Zh5NFLBoocX3fUhmC9FusAHtJUzWyJVZLzYT9iLyoyfWMsm2ylCDBqpy5IynFbi8CUajd~CjYdxZBWpxTsPO3yIFNJI~R2AFpWx8qp3fs38Yw_%26Key-Pair-Id%3DAPKAIAZRKVIO4BQ
    

    把它放到你的 js 中,你应该有这样的东西(来自亚马逊 CloudFront 文档中的 PHP 示例):
    var so_canned = new SWFObject('http://location.domname.com/~jvngkhow/player.swf','mpl','640','360','9');
        so_canned.addParam('allowfullscreen','true');
        so_canned.addParam('allowscriptaccess','always');
        so_canned.addParam('wmode','opaque');
        so_canned.addVariable('file','video.mp4%3FExpires%3D1309979985%26Signature%3DMUNF7pw1689FhMeSN6JzQmWNVxcaIE9mk1x~KOudJky7anTuX0oAgL~1GW-ON6Zh5NFLBoocX3fUhmC9FusAHtJUzWyJVZLzYT9iLyoyfWMsm2ylCDBqpy5IynFbi8CUajd~CjYdxZBWpxTsPO3yIFNJI~R2AFpWx8qp3fs38Yw_%26Key-Pair-Id%3DAPKAIAZRKVIO4BQ');
        so_canned.addVariable('streamer','rtmp://s3nzpoyjpct.cloudfront.net/cfx/st');
        so_canned.write('canned');
    

    摘要

    正如你所看到的,不是很容易! boto v2 对设置发行版有很大帮助。我会找出是否有可能在那里获得一些 URL 生成代码来改进这个伟大的库!

    关于python - 开始使用 Python 进行安全的 AWS CloudFront 流传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6549787/

    有关python - 开始使用 Python 进行安全的 AWS CloudFront 流传输的更多相关文章

    1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

      我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

    2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

      我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

    3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

      类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

    4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

      很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

    5. ruby - 在 Ruby 中使用匿名模块 - 2

      假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

    6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

      我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

    7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

      关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

    8. ruby-on-rails - 按天对 Mongoid 对象进行分组 - 2

      在控制台中反复尝试之后,我想到了这种方法,可以按发生日期对类似activerecord的(Mongoid)对象进行分组。我不确定这是完成此任务的最佳方法,但它确实有效。有没有人有更好的建议,或者这是一个很好的方法?#eventsisanarrayofactiverecord-likeobjectsthatincludeatimeattributeevents.map{|event|#converteventsarrayintoanarrayofhasheswiththedayofthemonthandtheevent{:number=>event.time.day,:event=>ev

    9. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

      我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

    10. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

      我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

    随机推荐