草庐IT

java - 如何使用 Java 将数据从 Cloud Storage 加载到 BigQuery

coder 2024-04-02 原文

我想将数据从 Google Cloud Storage 上传到 Big Query 中的表。有我的代码来创建工作:

public class LoadStorageToBigQuery {

// ///////////////////////
// USER GENERATED VALUES: you must fill in values specific to your
// application.
//
// Visit the Google API Console to create a Project and generate an
// OAuth 2.0 Client ID and Secret (http://code.google.com/apis/console).
// Then, add the Project ID below, and update the clientsecrets.json file
// with your client_id and client_secret
//
// ///////////////////////
private static final String PROJECT_ID = "gavd.com:compute";
private static final String CLIENTSECRETS_LOCATION = "/client_secrets.json";
 private static final String RESOURCE_PATH =
          ("E:/Work On/ads/Cloud/Dev/Source/BigQueryDemo02" + CLIENTSECRETS_LOCATION).replace(
              '/', File.separatorChar);
static GoogleClientSecrets clientSecrets = loadClientSecrets();

// Static variables for API scope, callback URI, and HTTP/JSON functions
private static final List<String> SCOPES = Arrays
        .asList("https://www.googleapis.com/auth/bigquery");
private static final String REDIRECT_URI = "urn:ietf:wg:oauth:2.0:oob";
private static final HttpTransport TRANSPORT = new NetHttpTransport();
private static final JsonFactory JSON_FACTORY = new JacksonFactory();

private static GoogleAuthorizationCodeFlow flow = null;

/**
 * @param args
 * @throws IOException
 * @throws InterruptedException
 */
public static void main(String[] args) throws IOException,
        InterruptedException {
    System.out.println(CLIENTSECRETS_LOCATION);
    // Create a new BigQuery client authorized via OAuth 2.0 protocol
    Bigquery bigquery = createAuthorizedClient();

    // Print out available datasets to the console
    listDatasets(bigquery, "publicdata");
    JobReference jobId = startQuery(bigquery, PROJECT_ID);
    System.out.println("Job ID = " + jobId); 
    JobReference jobRef = startQuery(bigquery, PROJECT_ID);
    checkQueryResults(bigquery, PROJECT_ID, jobRef);

}

/**
 * Creates an authorized BigQuery client service using the OAuth 2.0
 * protocol
 * 
 * This method first creates a BigQuery authorization URL, then prompts the
 * user to visit this URL in a web browser to authorize access. The
 * application will wait for the user to paste the resulting authorization
 * code at the command line prompt.
 * 
 * @return an authorized BigQuery client
 * @throws IOException
 */
public static Bigquery createAuthorizedClient() throws IOException {

    String authorizeUrl = new GoogleAuthorizationCodeRequestUrl(
            clientSecrets, REDIRECT_URI, SCOPES).setState("").build();

    System.out
            .println("Paste this URL into a web browser to authorize BigQuery Access:\n"
                    + authorizeUrl);

    System.out.println("... and type the code you received here: ");
    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
    String authorizationCode = in.readLine();

    // Exchange the auth code for an access token and refesh token
    Credential credential = exchangeCode(authorizationCode);

    return Bigquery.builder(TRANSPORT, JSON_FACTORY)
            .setHttpRequestInitializer(credential)
            .setApplicationName("Your User Agent Here").build();
}

/**
 * Display all BigQuery Datasets associated with a Project
 * 
 * @param bigquery
 *            an authorized BigQuery client
 * @param projectId
 *            a string containing the current project ID
 * @throws IOException
 */
public static void listDatasets(Bigquery bigquery, String projectId)
        throws IOException {
    Datasets.List datasetRequest = bigquery.datasets().list(projectId);
    DatasetList datasetList = datasetRequest.execute();
    if (datasetList.getDatasets() != null) {
        List<DatasetList.Datasets> datasets = datasetList.getDatasets();
        System.out.println("Available datasets\n----------------");
        System.out.println( " B = " + datasets.toString());
        for (DatasetList.Datasets dataset : datasets) {
            System.out.format("%s\n", dataset.getDatasetReference()
                    .getDatasetId());
        }
    }
}


public static JobReference startQuery(Bigquery bigquery, String projectId) throws IOException {

    Job job = new Job();
    JobConfiguration config = new JobConfiguration();
    JobConfigurationLoad loadConfig = new JobConfigurationLoad();
    config.setLoad(loadConfig);

    job.setConfiguration(config);

    // Set where you are importing from (i.e. the Google Cloud Storage paths).
    List<String> sources = new ArrayList<String>();
    sources.add("gs://gms_cloud_project/bigquery_data/06_13_2014/namesbystate.csv");
    loadConfig.setSourceUris(sources);
    //state:STRING,sex:STRING,year:INTEGER,name:STRING,occurrence:INTEGER
    // Describe the resulting table you are importing to:
    TableReference tableRef = new TableReference();
    tableRef.setDatasetId("gimasys_database");
    tableRef.setTableId("table_test");
    tableRef.setProjectId(projectId);
    loadConfig.setDestinationTable(tableRef);

    List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
    TableFieldSchema fieldState = new TableFieldSchema();
    fieldState.setName("state");
    fieldState.setType("STRING");
    TableFieldSchema fieldSex = new TableFieldSchema();
    fieldSex.setName("sex");
    fieldSex.setType("STRING");
    TableFieldSchema fieldName = new TableFieldSchema();
    fieldName.setName("name");
    fieldName.setType("STRING");
    TableFieldSchema fieldYear = new TableFieldSchema();
    fieldYear.setName("year");
    fieldYear.setType("INTEGER");
    TableFieldSchema fieldOccur = new TableFieldSchema();
    fieldOccur.setName("occurrence");
    fieldOccur.setType("INTEGER");
    fields.add(fieldState);
    fields.add(fieldSex);
    fields.add(fieldName);
    fields.add(fieldYear);
    fields.add(fieldOccur);
    TableSchema schema = new TableSchema();
    schema.setFields(fields);
    loadConfig.setSchema(schema);

    // Also set custom delimiter or header rows to skip here....
    // [not shown].

    Insert insert = bigquery.jobs().insert(projectId, job);
    insert.setProjectId(projectId);
    JobReference jobRef =  insert.execute().getJobReference();

    // ... see rest of codelab for waiting for job to complete.
    return jobRef;
    //return jobId;
}

/**
 * Polls the status of a BigQuery job, returns Job reference if "Done"
 * 
 * @param bigquery
 *            an authorized BigQuery client
 * @param projectId
 *            a string containing the current project ID
 * @param jobId
 *            a reference to an inserted query Job
 * @return a reference to the completed Job
 * @throws IOException
 * @throws InterruptedException
 */
private static Job checkQueryResults(Bigquery bigquery, String projectId,
        JobReference jobId) throws IOException, InterruptedException {
    // Variables to keep track of total query time
    long startTime = System.currentTimeMillis();
    long elapsedTime;

    while (true) {
        Job pollJob = bigquery.jobs().get(projectId, jobId.getJobId())
                .execute();
        elapsedTime = System.currentTimeMillis() - startTime;
        System.out.format("Job status (%dms) %s: %s\n", elapsedTime,
                jobId.getJobId(), pollJob.getStatus().getState());
        if (pollJob.getStatus().getState().equals("DONE")) {
            return pollJob;
        }
        // Pause execution for one second before polling job status again,
        // to
        // reduce unnecessary calls to the BigQUery API and lower overall
        // application bandwidth.
        Thread.sleep(1000);
    }
}

/**
 * Helper to load client ID/Secret from file.
 * 
 * @return a GoogleClientSecrets object based on a clientsecrets.json
 */
private static GoogleClientSecrets loadClientSecrets() {
    try {
        System.out.println("A");
        System.out.println(CLIENTSECRETS_LOCATION);
        GoogleClientSecrets clientSecrets = GoogleClientSecrets.load(new JacksonFactory(),
                    new FileInputStream(new File(
                        RESOURCE_PATH)));
        return clientSecrets;
    } catch (Exception e) {
        System.out.println("Could not load file Client_Screts");
        e.printStackTrace();
    }
    return clientSecrets;
}

/**
 * Exchange the authorization code for OAuth 2.0 credentials.
 * 
 * @return an authorized Google Auth flow
 */
static Credential exchangeCode(String authorizationCode) throws IOException {
    GoogleAuthorizationCodeFlow flow = getFlow();
    GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
            .setRedirectUri(REDIRECT_URI).execute();
    return flow.createAndStoreCredential(response, null);
}

/**
 * Build an authorization flow and store it as a static class attribute.
 * 
 * @return a Google Auth flow object
 */
static GoogleAuthorizationCodeFlow getFlow() {
    if (flow == null) {
        HttpTransport httpTransport = new NetHttpTransport();
        JacksonFactory jsonFactory = new JacksonFactory();

        flow = new GoogleAuthorizationCodeFlow.Builder(httpTransport,
                jsonFactory, clientSecrets, SCOPES)
                .setAccessType("offline").setApprovalPrompt("force")
                .build();
    }
    return flow;
}

在创建表时状态通知“完成”,但在 console.gooogle.com 中检查我的项目时 目前,我们也没有看到任何错误或问题或异常,但它没有将任何数据上传到我的表(表大小 0B)。我试图创建没有任何数据的表,而不是将数据上传到该表,但事实并非如此。

Job ID = {"jobId":"job_MqfuhuAU1Ms0GIOSbiePFGlc6TE","projectId":"ads.com:compute"}
Job status (451ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (2561ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (6812ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (8273ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (9695ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (11146ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (12466ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (13948ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (15392ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (16796ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (18296ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: RUNNING
Job status (19755ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: RUNNING
Job status (21587ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: DONE

如果有任何帮助,我将不胜感激

谢谢

最佳答案

pollJob.getStatus().getState().equals("DONE") 会在作业完成时告诉您,但不会给您退出代码。

您应该明确检查错误结果 pollJob.getStatus().getErrorResult();

    while (true) {
     Job pollJob = getBigQuery().jobs().get(projectId, jobId.getJobId()).execute();
     elapsedTime = System.currentTimeMillis() - startTime;
     if (pollJob.getStatus().getErrorResult() != null) {
        // The job ended with an error.
         System.out.format("Job %s ended with error %s", jobId.getJobId(),pollJob.getStatus().getErrorResult().getMessage(), projectId);
         throw new RuntimeException(String.format("Job %s ended with error %s", jobId.getJobId(), 
                 pollJob.getStatus().getErrorResult().getMessage()));       
     }       
     System.out.format("Job status (%dms) %s: %s\n", elapsedTime,
       jobId.getJobId(), pollJob.getStatus().getState());

     if (pollJob.getStatus().getState().equals("DONE")) {
       break;
     }
     Thread.sleep(5000);
    }

关于java - 如何使用 Java 将数据从 Cloud Storage 加载到 BigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24259394/

有关java - 如何使用 Java 将数据从 Cloud Storage 加载到 BigQuery的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  4. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  5. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  6. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  7. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  8. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  9. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  10. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

随机推荐