草庐IT

json - 将数据集写入 Hive 时出现异常

coder 2024-01-10 原文

我正在尝试使用 Spark Java 将 DataSet 写入 Hive 数据库,但在此过程中出现异常。

这是我的代码:

 Dataset<Row> data = spark.read().json(rdd).select("event.event_name");
 data.write().mode("overwrite").saveAsTable("telecom.t2");

这里,rdd 是流式传输的 json 数据,我可以通过以下命令打印结果 data

data.show();

但是当我尝试将此结果写入 Hive 数据库时,我没有收到任何异常,但当我尝试打印这些值时,我在 Hive 命令行 中收到异常。例如:

select * from telecom.t2;

异常(exception)情况是:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:317)
    at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:219)
    at org.xerial.snappy.Snappy.<clinit>(Snappy.java:44)
    at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
    at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
    at java.io.DataInputStream.readFully(DataInputStream.java:195)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
    at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
    at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
    at parquet.column.Encoding$1.initDictionary(Encoding.java:89)
    at parquet.column.Encoding$4.initDictionary(Encoding.java:148)
    at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337)
    at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66)
    at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61)
    at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
    at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
    at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
    at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
    at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
    at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
    at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:673)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:323)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:445)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:414)
    at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:140)
    at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1670)
    at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
    at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:165)
    at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
    at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:736)
    at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
    at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path
    at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
    at java.lang.Runtime.loadLibrary0(Runtime.java:870)
    at java.lang.System.loadLibrary(System.java:1122)
    at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)
    ... 48 more
Exception in thread "main" org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null
    at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:229)
    at org.xerial.snappy.Snappy.<clinit>(Snappy.java:44)
    at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
    at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
    at java.io.DataInputStream.readFully(DataInputStream.java:195)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
    at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
    at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
    at parquet.column.Encoding$1.initDictionary(Encoding.java:89)
    at parquet.column.Encoding$4.initDictionary(Encoding.java:148)
    at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337)
    at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66)
    at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61)
    at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
    at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
    at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
    at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
    at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
    at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
    at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:673)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:323)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:445)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:414)
    at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:140)
    at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1670)
    at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
    at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:165)
    at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
    at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:736)
    at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
    at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
2 Jan, 2017 12:02:40 PM WARNING: parquet.hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
2 Jan, 2017 12:02:40 PM INFO: parquet.hadoop.InternalParquetRecordReader: RecordReader initialized will read a total of 12 records.
2 Jan, 2017 12:02:40 PM INFO: parquet.hadoop.InternalParquetRecordReader: at row 0. reading next block
2 Jan, 2017 12:02:40 PM INFO: parquet.hadoop.InternalParquetRecordReader: block read in memory in 29 ms. row count = 12

最佳答案

当您调用 saveAsTable 时,Spark 默认以 parquet.snappy 格式保存数据,并且您似乎在 hive 库路径中没有 snappy。更改编写器格式(例如更改为 json)将不起作用,因为 Hive 需要使用此选项创建的表中的序列文件。

但是您可以在将数据保存为表之前更改压缩算法:

spark.conf.set("spark.sql.parquet.compression.codec", "gzip")

默认情况下,Gzip 压缩在 Hive 上应该可用,如果出现任何问题,您仍然可以在不压缩的情况下保存数据:

spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")

关于json - 将数据集写入 Hive 时出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41422390/

有关json - 将数据集写入 Hive 时出现异常的更多相关文章

  1. ruby - ECONNRESET (Whois::ConnectionError) - 尝试在 Ruby 中查询 Whois 时出错 - 2

    我正在用Ruby编写一个简单的程序来检查域列表是否被占用。基本上它循环遍历列表,并使用以下函数进行检查。require'rubygems'require'whois'defcheck_domain(domain)c=Whois::Client.newc.query("google.com").available?end程序不断出错(即使我在google.com中进行硬编码),并打印以下消息。鉴于该程序非常简单,我已经没有什么想法了-有什么建议吗?/Library/Ruby/Gems/1.8/gems/whois-2.0.2/lib/whois/server/adapters/base.

  2. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

  3. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  4. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  5. ruby-on-rails - Rails HTML 请求渲染 JSON - 2

    在我的Controller中,我通过以下方式在我的index方法中支持HTML和JSON:respond_todo|format|format.htmlformat.json{renderjson:@user}end在浏览器中拉起它时,它会自然地以HTML呈现。但是,当我对/user资源进行内容类型为application/json的curl调用时(因为它是索引方法),我仍然将HTML作为响应。如何获取JSON作为响应?我还需要说明什么? 最佳答案 您应该将.json附加到请求的url,提供的格式在routes.rb的路径中定义。这

  6. ruby-on-rails - Rails - 乐观锁定总是触发 StaleObjectError 异常 - 2

    我正在学习Rails,并阅读了关于乐观锁的内容。我已将类型为integer的lock_version列添加到我的articles表中。但现在每当我第一次尝试更新记录时,我都会收到StaleObjectError异常。这是我的迁移:classAddLockVersionToArticle当我尝试通过Rails控制台更新文章时:article=Article.first=>#我这样做:article.title="newtitle"article.save我明白了:(0.3ms)begintransaction(0.3ms)UPDATE"articles"SET"title"='dwdwd

  7. 使用 ACL 调用 upload_file 时出现 Ruby S3 "Access Denied"错误 - 2

    我正在尝试编写一个将文件上传到AWS并公开该文件的Ruby脚本。我做了以下事情:s3=Aws::S3::Resource.new(credentials:Aws::Credentials.new(KEY,SECRET),region:'us-west-2')obj=s3.bucket('stg-db').object('key')obj.upload_file(filename)这似乎工作正常,除了该文件不是公开可用的,而且我无法获得它的公共(public)URL。但是当我登录到S3时,我可以正常查看我的文件。为了使其公开可用,我将最后一行更改为obj.upload_file(file

  8. ruby - #之间? Cooper 的 *Beginning Ruby* 中的错误或异常 - 2

    在Cooper的书BeginningRuby中,第166页有一个我无法重现的示例。classSongincludeComparableattr_accessor:lengthdef(other)@lengthother.lengthenddefinitialize(song_name,length)@song_name=song_name@length=lengthendenda=Song.new('Rockaroundtheclock',143)b=Song.new('BohemianRhapsody',544)c=Song.new('MinuteWaltz',60)a.betwee

  9. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  10. ruby - 在 Ruby 中重新分配常量时抛出异常? - 2

    我早就知道Ruby中的“常量”(即大写的变量名)不是真正常量。与其他编程语言一样,对对象的引用是唯一存储在变量/常量中的东西。(侧边栏:Ruby确实具有“卡住”引用对象不被修改的功能,据我所知,许多其他语言都没有提供这种功能。)所以这是我的问题:当您将一个值重新分配给常量时,您会收到如下警告:>>FOO='bar'=>"bar">>FOO='baz'(irb):2:warning:alreadyinitializedconstantFOO=>"baz"有没有办法强制Ruby抛出异常而不是打印警告?很难弄清楚为什么有时会发生重新分配。 最佳答案

随机推荐