草庐IT

Golang AWS S3manager multipartreader w/Goroutines

coder 2024-07-09 原文

我正在创建一个端点,允许用户同时上传多个文件并将它们存储在 S3 中。目前,我可以使用 MultipartReader 和 s3manager 来实现这一点,但只能以非同步方式实现。

我正在尝试实现 Go 例程来加速此功能并将多个文件同时上传到 S3,但数据竞争错误导致了麻烦。我认为 *s3manager 可能不像文档所说的那样是 goroutine 安全的。 (如果将 go-statement 替换为函数代码,代码将同步运行)。

实现互斥锁是否可以修复我的错误?

func uploadHandler(w http.ResponseWriter, r *http.Request) {
    counter := 0
    switch r.Method {
    // GET to display the upload form.
    case "GET":
        err := templates.Execute(w, nil)
        if err != nil {
            log.Print(err)
        }
        // POST uploads each file and sends them to S3
    case "POST":
        c := make(chan string)
        // grab the request.MultipartReader
        reader, err := r.MultipartReader()
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        // copy each part to destination.
        for {
            part, err := reader.NextPart()
            if err == io.EOF {
                break
            }
            // if part.FileName() is empty, skip this iteration.
            if part.FileName() == "" {
                continue
            }
            counter++
            go S3Upload(c, part)
        }
        for i := 0; i < counter; i++ {
          fmt.Println(<-c)
         }
        // displaying a success message.
        err = templates.Execute(w, "Upload successful.")
        if err != nil {
            log.Print(err)
        }
    default:
        w.WriteHeader(http.StatusMethodNotAllowed)
    }
}  

func S3Upload(c chan string, part *multipart.Part) {
    bucket := os.Getenv("BUCKET")
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String(os.Getenv("REGION"))},
    )
    if err != nil {
        c <- "error occured creating session"
        return
    }
    uploader := s3manager.NewUploader(sess)
    _, err = uploader.Upload(&s3manager.UploadInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(part.FileName()),
        Body:   part,
    })
    if err != nil {
        c <- "Error occurred attempting to upload to S3"
        return
    }
    // successful upload
    c <- "successful upload"
}

最佳答案

^ 查看上面的所有评论,

这里是一些修改过的代码示例, channel 在这里没有用。

package main

import (
    "bytes"
    "io"
    "log"
    "net/http"
    "os"
    "strings"
    "sync"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

var (
    setupUploaderOnce sync.Once
    uploader          *s3manager.Uploader
    bucket            string
    region            string
)
// ensure sessions and uploader are setup only once using a Singleton pattern
func setupUploader() {
    setupUploaderOnce.Do(func() {
        bucket = os.Getenv("BUCKET")
        region = os.Getenv("REGION")
        sess, err := session.NewSession(&aws.Config{Region: aws.String(region)})
        if err != nil {
            log.Fatal(err)
        }
        uploader := s3manager.NewUploader(sess)
    })
}

// normally singleton stuff is packaged out and called before starting the server, but to keep the example a single file, load it up here
func init() {
    setupUploader()
}

func uploadHandler(w http.ResponseWriter, r *http.Request) {
    counter := 0
    switch r.Method {
    // GET to display the upload form.
    case "GET":
        err := templates.Execute(w, nil)
        if err != nil {
            log.Print(err)
        }
        // POST uploads each file and sends them to S3
    case "POST":
        var buf bytes.Buffer
        // "file" is defined by the form field, change it to whatever your form sets it too
        file, header, err := r.FormFile("file")
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        // close the file
        defer file.Close()
        fileName := strings.Split(header.Filename, ".")
        // load the entire file data to the buffer
        _, err = io.Copy(&buf, file)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }

        // copy each part to destination.
        go S3Upload(buf, fileName[0])
        // displaying a success message.
        err = templates.Execute(w, "Upload successful.")
        if err != nil {
            log.Print(err)
        }
    default:
        w.WriteHeader(http.StatusMethodNotAllowed)
    }
}

// keeping this simple, do something with the err, like log
// if the uploader fails in the goroutine, there is potential
// for false positive uploads... channels are not really good here
// either, for that, bubble the error up,
// and don't spin up a goroutine.. same thing as waiting for the channel to return.
func S3Upload(body bytes.Buffer, fileName string) {
    _, err := uploader.Upload(&s3manager.UploadInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(fileName),
        Body:   bytes.NewReader(body.Bytes()),
    })
}

关于Golang AWS S3manager multipartreader w/Goroutines,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51128286/

有关Golang AWS S3manager multipartreader w/Goroutines的更多相关文章

  1. go - http中的Goroutines - 2

    Thisquestionalreadyhasananswerhere:Gowebserverrequestsspawnitsowngoroutine?(1个答案)去年关闭。我对http中的goroutines有疑问。在下面的代码中是一个简单的Web服务器。如果有5个人访问服务器,则2个人进入handler1()函数,3个人进入handler2(),golang将创建5个goroutine或我是否需要保留字go?例如gohttp.HandleFunc("/h1",handler1)packagemainimport("fmt""log""net/http")funchandler1(wh

  2. go - goroutines 的意外输出 - 2

    我正在使用Go并发并具有以下代码:packagemainimport("fmt""runtime""sync")funcmain(){runtime.GOMAXPROCS(1)varwgsync.WaitGroupwg.Add(2)fmt.Println("StartingGoroutines")gofunc(){deferwg.Done()forcount:=0;count我的输出是:StartingGoroutinesWaitingtoFinishABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOP

  3. go - 如何存储 goroutines 的标识以便我以后可以停止它们 - 2

    这个问题在这里已经有了答案:Howtostopagoroutine(8个答案)HowdoIkillagoroutine(1个回答)HowcanIidentifythreadsorsimilarinlog?(3个答案)Terminatethesecondgoroutine(3个答案)关闭3年前。我正在尝试创建多个goroutine并让它们同时运行。然后当一个请求进来时,我想识别其中一个,并只停止那个特定的goroutine,而其余的则继续文件1mm:=remote_method.NewPlayerCreator()mm.NewPlayer("Leo","Messi")//Letsjust

  4. go - goroutines的执行顺序 - 2

    我对golang很陌生。我的理解是,所有的go-routines都会同时执行。两个匿名goroutines将同时开始执行。但是当我运行这段代码时,它总是打印a=1firstexecuteda=1secondexecutedpanic:b!=1不应该打印a=1a=1firstexecutedResponsetrueandsoon或b=1b=1firstexecutedResponsetrueandsoon既然向channel发送了一个值后,相应的协程应该阻塞并等待接收者?funcmain(){vara,bintvarc=make(chanbool)gofunc(){b=1fmt.Prin

  5. Goroutines 和 mutex - 2

    关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭4年前。Improvethisquestionfunc(s*Server)start(){s.Lock()defers.Unlock()if!s.isClosed{gos.processing()}gos.start()}func(s*Server)processing(){s.Lock()//dostuffs.Unlock()}我有一个工作的Golang项目,其中有一段代码遵循上面显示的逻辑。我不明白为什么这个逻辑像我预料的那样会出现死锁。

  6. go - 为什么 Goroutines 的执行时间相差很大? - 2

    我只是在测量一组goroutine的执行时间。这意味着:我开始测量,然后启动20个goroutine并在它们完成后立即停止测量。我重复该过程大约4次,然后比较4次执行时间。有时,这些执行时间会有很大差异:20个goroutine的第一次运行:1.2毫秒20个协程的第2次运行:1.9毫秒20个协程的第3次运行:1.4毫秒20个goroutine的第4次运行:17.0毫秒!为什么有时差异如此之大?有什么办法可以避免吗? 最佳答案 Whydoesitsometimesdiffersosignificantly?执行时间在某些时候总是不可预

  7. go - 我想在 goroutines 之间通信并无限期地阻塞主线程 - 2

    我如何阻止mainfunc并允许goroutines通过channel进行通信以下代码示例会抛出错误0fatalerror:所有goroutines都睡着了-死锁!packagemainimport("fmt""time")funcmain(){ch:=make(chanint)gofunc(){value:= 最佳答案 我想你想打印所有值[0:99]。然后你需要在第一个goroutine中循环。另外,你需要传递信号来打破循环funcmain(){ch:=make(chanint)stopProgram:=make(chanbool

  8. 去死锁所有 goroutines 睡着了 - 2

    这是我之前帖子的后续:http://stackoverflow.com/questions/34736825/goroutine-exit-status-2-what-does-it-mean-why-is-it-happening?noredirect=1#comment57238789_34736825在阅读了SO内外的多个主题和文章之后,我仍然无法弄清楚应该在哪里关闭channel。该程序将打开一个文件列表,为每个输入文件创建一个输出文件(具有相同的名称),访问每个输入文件中的所有url并从中获取所有href链接-这些链接将保存到相应的输出文件。但是,我收到以下错误:http:/

  9. go - 反正执行 goroutine 时所有 goroutines 都睡着了 - 2

    我收到以下错误,我不明白为什么:发送:查询Herefatal错误:所有goroutines都睡着了-死锁!您可以看到我正在调用我使用goroutine创建的函数routine。我真的没有更多细节可以提供。packagemainimport("fmt""net""sync")const(udphost="127.0.0.1"udpport=":150"StopCharacter="\r\n\r\n")varwgsync.WaitGroupfuncroutine(){deferwg.Done()//establishconnectionaddresspartsschemaUri:=udph

  10. go - http.Client 和 goroutines 的不可预测的结果 - 2

    我是Golang的新手,试图构建一个系统,从一组url中获取内容并使用正则表达式提取特定行。当我用goroutines包装代码时,问题就开始了。我得到了不同数量的正则表达式结果,并且许多提取的行都是重复的。max_routines:=3sem:=make(chanint,max_routines)//tocontrolthenumberofworkingroutinesvarwgsync.WaitGroupch_content:=make(chanstring)client:=http.Client{}fori:=2;;i++{//fortestingifi>5{break}//loo

随机推荐