草庐IT

Spark:单词计数(Word Count)的MapReduce实现(Java/Python)

Orion's Blog 2023-03-28 原文

1 导引

我们在博客《Hadoop: 单词计数(Word Count)的MapReduce实现 》中学习了如何用Hadoop-MapReduce实现单词计数,现在我们来看如何用Spark来实现同样的功能。

2. Spark的MapReudce原理

Spark框架也是MapReduce-like模型,采用“分治-聚合”策略来对数据分布进行分布并行处理。不过该框架相比Hadoop-MapReduce,具有以下两个特点:

  • 对大数据处理框架的输入/输出,中间数据进行建模,将这些数据抽象为统一的数据结构命名为弹性分布式数据集(Resilient Distributed Dataset),并在此数据结构上构建了一系列通用的数据操作,使得用户可以简单地实现复杂的数据处理流程。

  • 采用了基于内存的数据聚合、数据缓存等机制来加速应用执行尤其适用于迭代和交互式应用。

Spark社区推荐用户使用Dataset、DataFrame等面向结构化数据的高层API(Structured API)来替代底层的RDD API,因为这些高层API含有更多的数据类型信息(Schema),支持SQL操作,并且可以利用经过高度优化的Spark SQL引擎来执行。不过,由于RDD API更基础,更适合用来展示基本概念和原理,后面我们的代码都使用RDD API。

Spark的RDD/dataset分为多个分区。RDD/Dataset的每一个分区都映射一个或多个数据文件, Spark通过该映射读取数据输入到RDD/dataset中。

因为我们这里采用的本地单机多线程调试模式,默认分区数即为本地机器使用的线程数,若在代码中设置了local[N](使用N个线程),则默认为N个分区;若设为local[*](使用本地CPU核数个线程),则默认分区数为本地CPU核数。大家可以通过调用RDD对象的getNumPartitions()查看实际分区个数。

我们下面的流程描述中,假设每个文件对应一个分区。

Spark的Map示意图如下:

Spark的Reduce示意图如下:

3. Word Count的Java实现

项目架构如下图:

Word-Count-Spark
├─ input
│  ├─ file1.txt
│  ├─ file2.txt
│  └─ file3.txt
├─ output
│  └─ result.txt
├─ pom.xml
├─ src
│  ├─ main
│  │  └─ java
│  │     └─ WordCount.java
│  └─ test
└─ target

WordCount.java文件如下:

package com.orion;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.io.*;
import java.nio.file.*;

public class WordCount {
	private static Pattern SPACE = Pattern.compile(" ");

	public static void main(String[] args) throws Exception {
		if (args.length != 3) {
			System.err.println("Usage: WordCount <intput directory> <output directory> <number of local threads>");
			System.exit(1);
		}
                String input_path = args[0];
                String output_path = args[1];
		int n_threads = Integer.parseInt(args[2]);

		SparkSession spark = SparkSession.builder()
			.appName("WordCount")
			.master(String.format("local[%d]", n_threads))
			.getOrCreate();

		JavaRDD<String> lines = spark.read().textFile(input_path).javaRDD();

		JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
		JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
		JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);

		List<Tuple2<String, Integer>> output = counts.collect();

                String filePath = Paths.get(output_path, "result.txt").toString();
                BufferedWriter out = new BufferedWriter(new FileWriter(filePath));
		for (Tuple2<?, ?> tuple : output) {
			out.write(tuple._1() + ": " + tuple._2() + "\n");
		}
		out.close();
                spark.stop();
	}
}

pom.xml文件配置如下:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.WordCount</groupId>
  <artifactId>WordCount</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>WordCount</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <!-- 集中定义版本号 -->
  <properties>
    <scala.version>2.12.10</scala.version>
    <scala.compat.version>2.12</scala.compat.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <project.timezone>UTC</project.timezone>
    <java.version>11</java.version>
    <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
    <site.plugin.version>3.7.1</site.plugin.version>
    <scalatest.version>3.1.2</scalatest.version>
    <scalatest-maven-plugin>2.0.0</scalatest-maven-plugin>
    <scala.maven.plugin.version>4.4.0</scala.maven.plugin.version>
    <maven.compiler.plugin.version>3.8.0</maven.compiler.plugin.version>
    <maven.javadoc.plugin.version>3.2.0</maven.javadoc.plugin.version>
    <maven.source.plugin.version>3.2.1</maven.source.plugin.version>
    <maven.deploy.plugin.version>2.8.2</maven.deploy.plugin.version>
    <nexus.staging.maven.plugin.version>1.6.8</nexus.staging.maven.plugin.version>
    <maven.help.plugin.version>3.2.0</maven.help.plugin.version>
    <maven.gpg.plugin.version>1.6</maven.gpg.plugin.version>
    <maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <spark.version>3.2.1</spark.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!--======SCALA======-->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency> <!-- Spark dependency -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
    </dependency>
  </dependencies>


  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
          <configuration>
              <source>11</source>
              <target>11</target>
              <fork>true</fork>
              <executable>/Library/Java/JavaVirtualMachines/jdk-11.0.15.jdk/Contents/Home/bin/javac</executable>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

记得配置输入参数inputoutput3分别代表输入目录、输出目录和使用本地线程数(在VSCode中在launch.json文件中配置)。编译运行后可在output目录下查看result.txt

Tom: 1
Hello: 3
Goodbye: 1
World: 2
David: 1

可见成功完成了单词计数功能。

4. Word Count的Python实现

先使用pip按照pyspark==3.8.2

pip install pyspark==3.8.2

注意PySpark只支持Java 8/11,请勿使用更高级的版本。这里我使用的是Java 11。运行java -version可查看本机Java版本。

(base) orion-orion@MacBook-Pro ~ % java -version
java version "11.0.15" 2022-04-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)

项目架构如下:

Word-Count-Spark
├─ input
│  ├─ file1.txt
│  ├─ file2.txt
│  └─ file3.txt
├─ output
│  └─ result.txt
├─ src
│  └─ word_count.py

word_count.py编写如下:

from pyspark.sql import SparkSession
import sys
import os
from operator import add

if len(sys.argv) != 4:
    print("Usage: WordCount <intput directory> <output directory> <number of local threads>", file=sys.stderr)
    exit(1)
     
input_path, output_path, n_threads = sys.argv[1], sys.argv[2], int(sys.argv[3])

spark = SparkSession.builder.appName("WordCount").master("local[%d]" % n_threads).getOrCreate()

lines = spark.read.text(input_path).rdd.map(lambda r: r[0])

counts = lines.flatMap(lambda s: s.split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKey(add)

output = counts.collect()

with open(os.path.join(output_path, "result.txt"), "wt") as f:
    for (word, count) in output:
        f.write(str(word) +": " + str(count) + "\n")

spark.stop()

使用python word_count.py input output 3运行后,可在output中查看对应的输出文件result.txt

Hello: 3
World: 2
Goodbye: 1
David: 1
Tom: 1

可见成功完成了单词计数功能。

参考

有关Spark:单词计数(Word Count)的MapReduce实现(Java/Python)的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  3. ruby-on-rails - 在 ruby​​ 中使用 gsub 函数替换单词 - 2

    我正在尝试用ruby​​中的gsub函数替换字符串中的某些单词,但有时效果很好,在某些情况下会出现此错误?这种格式有什么问题吗NoMethodError(undefinedmethod`gsub!'fornil:NilClass):模型.rbclassTest"replacethisID1",WAY=>"replacethisID2andID3",DELTA=>"replacethisID4"}end另一个模型.rbclassCheck 最佳答案 啊,我找到了!gsub!是一个非常奇怪的方法。首先,它替换了字符串,所以它实际上修改了

  4. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  5. ruby-on-rails - Ruby on Rails 计数器缓存错误 - 2

    尝试在我的RoR应用程序中实现计数器缓存列时出现错误Unknownkey(s):counter_cache。我在这个问题中实现了模型关联:Modelassociationquestion这是我的迁移:classAddVideoVotesCountToVideos0Video.reset_column_informationVideo.find(:all).eachdo|p|p.update_attributes:videos_votes_count,p.video_votes.lengthendenddefself.downremove_column:videos,:video_vot

  6. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  7. ruby - 使用多个数组创建计数 - 2

    我正在尝试按0-9和a-z的顺序创建数字和字母列表。我有一组值value_array=['0','1','2','3','4','5','6','7','8','9','a','b','光盘','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','','u','v','w','x','y','z']和一个组合列表的数组,按顺序,这些数字可以产生x个字符,比方说三个list_array=[]和一个当前字母和数字组合的数组(在将它插入列表数组之前我会把它变成一个字符串,]current_combo['0','0','0']

  8. Python 相当于 Perl/Ruby ||= - 2

    这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。

  9. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  10. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

随机推荐