草庐IT

python - 名称错误 : name 'redis' is not defined - PySpark - Redis

我在pyspark中使用addPyFile方法加载redis.zip文件。我可以使用加载文件sc.addPyFile("/home/path/to/redis.zip")但是在使用./pyspark运行代码时,它显示错误:NameError:name'redis'isnotdefinedzip(redis.zip)包含.py文件(client.py,connection.py、exceptions.py、lock.py、utils.py等)。Python版本是-3.5,spark是2.7 最佳答案 如果您将py文件打包成zip并使用

mongodb - pyspark-mongodb 集合读取命令不会执行

我安装了以下版本:-Spark2.1.0,斯卡拉2.11.6,mongoDB3.2.17我尝试使用以下命令启动pysparkshell./bin/pyspark--packagesorg.mongodb.spark:mongo-spark-connector_2.11:2.2.0在此之后我开始了sparksession如下frompyspark.sqlimportSparkSessionmy_spark=SparkSession.builder.appName("myApp").config("spark.mongodb.input.uri","mongodb://127.0.0.1/

python - 如何将其他参数传递给pyspark中用户定义的方法以进行过滤方法?

我在python中使用spark并且我有一个过滤器约束如下:my_rdd.filter(my_func)其中my_func是我编写的一种方法,用于根据我自己的逻辑过滤rdd项。我定义了my_func如下:defmy_func(my_item):{...}现在,除了进入它的项目之外,我想将另一个单独的参数传递给my_func。我怎样才能做到这一点?我知道my_item将引用来自my_rdd的一项,我如何将自己的参数(比如说my_param)作为附加参数传递给my_func? 最佳答案 使用以下lambda语法并使用额外参数修改您的my

python - 作为 PySpark 的 reduceByKey 键的列表

我正在尝试对格式为(([a,b,c],1),([a,b,c],1),([a,d,b,e],1),...似乎pyspark不会接受数组作为普通键中的键,通过简单地应用.reduceByKey(add)来减少值。我已经尝试通过.map((x,y):(str(x),y))首先将数组转换为字符串,但这不起作用,因为后处理将字符串返回数组太慢了。有没有办法让pyspark使用数组作为键或使用另一个函数快速将字符串转换回数组?这是相关的错误代码File"/home/jan/Documents/spark-1.4.0/python/lib/pyspark.zip/pyspark/shuffle.py

python - MAP (PySpark) 返回的元组列表(或迭代器)

我有一个映射器方法:defmapper(value):...forkey,valueinsome_list:yieldkey,value实际上,我需要的与普通的wordcount示例相差无几。我已经有了工作脚本,但前提是映射器方法看起来像这样:defmapper(value):...returnkey,value这是它的调用方式:sc.textFile(sys.argv[2],1).map(mapper).reduceByKey(reducer).collect()我花了2个小时尝试编写支持映射器中的生成器的代码。但不能那样做。我什至同意只返回一个列表:defmapper(value)

python - pyspark 在没有 pandas 的情况下将一列拆分为多列

我的问题是如何将一列拆分为多列。我不知道为什么df.toPandas()不起作用。例如,我想将“df_test”更改为“df_test2”。我看到很多使用pandas模块的例子。还有别的办法吗?提前谢谢你。df_test=sqlContext.createDataFrame([(1,'14-Jul-15'),(2,'14-Jun-15'),(3,'11-Oct-15'),],('id','date'))df_test2iddaymonthyear114Jul15214Jun15111Oct15 最佳答案 Spark>=2.2您可以跳

python - PySpark DataFrame 无法删除重复项

您好,我创建了一个spark数据框,我正在尝试删除重复项:df.drop_duplicates(subset='id')我收到以下错误:Py4JError:Anerroroccurredwhilecallingz:org.apache.spark.api.python.PythonUtils.toSeq.Trace:py4j.Py4JException:MethodtoSeq([classjava.lang.String])doesnotexistatpy4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335

python - pyspark 行号数据框

我有一个数据框,包含时间列、a、b、c、d、val。我想创建一个带有附加列的数据框,该列将包含行的行号,在每个组中,其中a、b、c、d是组键。我尝试使用sparksql,通过定义一个窗口函数,特别是在sql中,它看起来像这样:selecttime,a,b,c,d,val,row_number()over(partitionbya,b,c,dorderbytime)asrnfromtablegroupbya,b,c,d,val我想在数据框itslef上执行此操作,而不使用sparksql。谢谢 最佳答案 我不太了解pythonapi,

python - Pyspark 将列类型从日期更改为字符串

我有以下数据框:corr_temp_df[('vacationdate','date'),('valueE','string'),('valueD','string'),('valueC','string'),('valueB','string'),('valueA','string')]现在我想将vacationdate列的数据类型更改为字符串,以便数据框也采用这种新类型并覆盖所有条目的数据类型数据。例如。写完后:corr_temp_df.dtypes应覆盖vacationdate的数据类型。我已经使用过像cast、StringType或astype这样的函数,但我没有成功。你知道怎

python - Pyspark:将列中的 json 分解为多列

数据是这样的——+-----------+-----------+-----------------------------+|id|point|data|+-----------------------------------------------------+|abc|6|{"key1":"124","key2":"345"}||dfl|7|{"key1":"777","key2":"888"}||4bd|6|{"key1":"111","key2":"788"}|我正在尝试将其分解为以下格式。+-----------+-----------+-----------+------