QQ 咨询
喝杯饮料
微信交流
wx公众号
432

在 inteillj idea 中使用 Spark 操作 Hive
如何文章对你有用的话,就点击左边悬浮按钮,请小编喝杯饮料吧
时间: 2022-05-18 16:37:58  作者:北桥苏  阅读:(15)


前言:

         都知道,小编前面已经简单介绍过在 windows 下 hadoop 和 hive 环境搭建和基本使用。这次的 Spark 有点突兀,但是也可以先忽略,重要的是先在 IDEA 中安装 bigData 插件连接 hadoop 已经 HDFS,而后再简单介绍使用 Spark 操作 Hive。

 

Big Data Tools 安装:


1. 点击 File, 选择 Settings,再选择 Plugins 搜索 Big Data Tools,最后下载安装。

2. 下载完毕后,底部和右侧栏会多出 Hadoop 或 Big Data Tools 的选项。

 

连接方法:


1. 进入 hadoop 的 sbin 目录,start-all 启动成功,打开 web 控制台 127.0.0.1:50070 (默认),记住如下标志的节点地址,后面 hdfs 连接的就是这个。

2. 只要 hadoop 启动成功后,打开 IDEA 的 hadoop 其实就可以正常自动连接了。

3. 或者打开右侧栏的 Big Data Tools,添加一个连接,Hadoop。

4. 连接 Hdfs。

(1). 点击右侧栏 Big Data Tools 新增 Hdfs。

(2). 重要的就是 Authentication type,选择 Explicit uri。File system URI 填写的就是上面控制台的节点地址。

(3). 连接成功后就可以清晰的看到 HDFS 的目录,并且可以创建,删除和上传。不过需要对指定路径授权。

 

Hive 操作:


          关于操作 Hive, 以下基于 Maven 构建 Scala 项目。项目创建和 Hive 就略过了,好像在 Kafka 一文中介绍过如何新建 Maven 的 Scala,而 Hive 的产品还是原理介绍网上比较多,以下主要是小编的日志式记录,所以以过程居多,那么就开始了。

1. pom.xml 添加如下依赖并安装 (其实是我整个文件,不需要的可以根据注释删除)。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0modelVersion>
  <groupId>org.examplegroupId>
  <artifactId>maven_scala_testartifactId>
  <version>1.0-SNAPSHOTversion>
  <name>${project.artifactId}name>
  <description>My wonderfull scala appdescription>
  <inceptionYear>2015inceptionYear>
  <licenses>
    <license>
      <name>My Licensename>
      <url>http://....url>
      <distribution>repodistribution>
    license>
  licenses>

  <properties>
    <maven.compiler.source>1.6maven.compiler.source>
    <maven.compiler.target>1.6maven.compiler.target>
    <encoding>UTF-8encoding>
    <scala.version>2.11.5scala.version>
    <scala.compat.version>2.11scala.compat.version>
    <spark.version>2.2.0spark.version>
    <hadoop.version>2.6.0hadoop.version>
    <hbase.version>1.2.0hbase.version>
  properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-langgroupId>
      <artifactId>scala-libraryartifactId>
      <version>${scala.version}version>
    dependency>

    
    <dependency>
      <groupId>junitgroupId>
      <artifactId>junitartifactId>
      <version>4.11version>

    dependency>
    <dependency>
      <groupId>org.specs2groupId>
      <artifactId>specs2-core_${scala.compat.version}artifactId>
      <version>2.4.16version>

    dependency>
    <dependency>
      <groupId>org.scalatestgroupId>
      <artifactId>scalatest_${scala.compat.version}artifactId>
      <version>2.2.4version>

    dependency>

    
    <dependency>
      <groupId>org.scala-langgroupId>
      <artifactId>scala-libraryartifactId>
      <version>${scala.version}version>
    dependency>

    
    <dependency>
      <groupId>org.apache.sparkgroupId>
      <artifactId>spark-core_2.11artifactId>
      <version>${spark.version}version>
    dependency>
    <dependency>
      <groupId>org.apache.sparkgroupId>
      <artifactId>spark-sql_2.11artifactId>
      <version>${spark.version}version>
    dependency>
    <dependency>
      <groupId>org.apache.sparkgroupId>
      <artifactId>spark-streaming_2.11artifactId>
      <version>${spark.version}version>
      <scope>providedscope>
    dependency>
    <dependency>
      <groupId>org.apache.sparkgroupId>
      <artifactId>spark-mllib_2.11artifactId>
      <version>${spark.version}version>
      <scope>providedscope>
    dependency>
    <dependency>
      <groupId>org.apache.sparkgroupId>
      <artifactId>spark-hive_2.11artifactId>
      <version>${spark.version}version>
      
    dependency>



    
    <dependency>
      <groupId>org.apache.hadoopgroupId>
      <artifactId>hadoop-clientartifactId>
      <version>${hadoop.version}version>
    dependency>
    <dependency>
      <groupId>org.apache.hadoopgroupId>
      <artifactId>hadoop-commonartifactId>
      <version>${hadoop.version}version>
    dependency>
    <dependency>
      <groupId>org.apache.hadoopgroupId>
      <artifactId>hadoop-hdfsartifactId>
      <version>${hadoop.version}version>
    dependency>

    
    <dependency>
      <groupId>org.apache.hbasegroupId>
      <artifactId>hbase-clientartifactId>
      <version>${hbase.version}version>
    dependency>
    <dependency>
      <groupId>org.apache.hbasegroupId>
      <artifactId>hbase-serverartifactId>
      <version>${hbase.version}version>
    dependency>

    
    <dependency>
      <groupId>org.apache.kafkagroupId>
      <artifactId>kafka_2.11artifactId>
      <version>1.1.0version>
    dependency>

    <dependency>
      <groupId>org.apache.kafkagroupId>
      <artifactId>kafka-clientsartifactId>
      <version>1.1.0version>
    dependency>

  dependencies>

  <build>
    <sourceDirectory>src/main/scalasourceDirectory>
    <testSourceDirectory>src/test/scalatestSourceDirectory>
    <plugins>
      <plugin>
        
        <groupId>net.alchim31.mavengroupId>
        <artifactId>scala-maven-pluginartifactId>
        <version>3.2.0version>
        <executions>
          <execution>
            <goals>
              <goal>compilegoal>
              <goal>testCompilegoal>
            goals>
            <configuration>
              <args>

                <arg>-dependencyfilearg>
                <arg>${project.build.directory}/.scala_dependenciesarg>
              args>
            configuration>
          execution>
        executions>
      plugin>
      <plugin>
        <groupId>org.apache.maven.pluginsgroupId>
        <artifactId>maven-surefire-pluginartifactId>
        <version>2.18.1version>
        <configuration>
          <useFile>falseuseFile>
          <disableXmlReport>truedisableXmlReport>
          
          
          <includes>
            <include>**/*Test.*include>
            <include>**/*Suite.*include>
          includes>
        configuration>
      plugin>
    plugins>
  build>
project>

2. 项目的 resources 新建元数据文件,可以是 txt,以空格为列,换行为行,这里对 hive 表格创建时重要。

在通过 HQL 创建表格,如何没有指定分列和分行表示,再通过 HQL 的 select 查询数据都是 NULL,具体可以看下面代码演示。

3. 加载源数据文件,只需要项目根目录以下的路径即可。比如 resouces 下的 hello.txt 只需要指定

src/main/resources/hello.txt

4. Hive 相关操作的代码。

这里需要注意的是,hive 中的 Default(默认)数据仓库的最原始位置是在 hdfs 上的 /user/hive/warehouse,也就是以后在默认下,新建的表都在那个目录下。

而仓库的原始位置是本地的 /usr/local/hive/conf/hive-default.xml.template 文件里配置
package com.xudong

import org.apache.spark.sql.SparkSession

object TestSparkHiveHql {

  def main(args: Array[String]): Unit = {

      // 创建spark环境
      val spark = SparkSession
        .builder()
        .appName("Spark Hive HQL")
        .master("local[*]")
        .config("spark.sql.warehouse.dir","hdfs://rebuildb.xdddsd75.com:9500/user/hive/warehouse")
        .enableHiveSupport()
        .getOrCreate();

    import spark.implicits._
    import spark.sql

    // 显示HDFS数据库
    spark.sql("show databases").show();
    // 使用指定数据库
    spark.sql("use default");
    // 创建表格并约定字段
    spark.sql("CREATE TABLE users(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY '\\n' STORED AS TEXTFILE");
    // 将本地数据加载到表格
    spark.sql("LOAD DATA LOCAL INPATH 'src/main/resources/hello.txt' overwrite into table users");

    // 查询表格数据HQL
    spark.sql("SELECT * FROM users").show()

    // 聚合统计表格数据条数HQL
    spark.sql("SELECT COUNT(*) FROM users").show()

    // 多表关联查询
    spark.sql("select * from info i join users u on u.id = i.user_id").show();

  }

}

5. hdfs 简单操作示例。

package com.xudong

package com.dkl.leanring.spark.hdfs
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import scala.collection.mutable.ArrayBuffer

/**
 * 主要目的是打印某个hdfs目录下所有的文件名,包括子目录下的
 * 其他的方法只是顺带示例,以便有其它需求可以参照改写
 */
object FilesList {

  def main(args: Array[String]): Unit = {

    val path = "hdfs://rebuildb.hhyp75.com:9500/tmp/hive"
    println("打印所有的文件名,包括子目录")

    listAllFiles(path)

    println("打印一级文件名")

    listFiles(path)
    println("打印一级目录名")

    listDirs(path)
    println("打印一级文件名和目录名")

    listFilesAndDirs(path)

    // getAllFiles(path).foreach(println)
    // getFiles(path).foreach(println)
    // getDirs(path).foreach(println)
  }

  def getHdfs(path: String) = {
    val conf = new Configuration()
    FileSystem.get(URI.create(path), conf)
  }

  def getFilesAndDirs(path: String): Array[Path] = {
    val fs = getHdfs(path).listStatus(new Path(path))
    FileUtil.stat2Paths(fs)
  }
  /**************直接打印************/

  /**
   * 打印所有的文件名,包括子目录
   */
  def listAllFiles(path: String) {
    val hdfs = getHdfs(path)
    val listPath = getFilesAndDirs(path)
    listPath.foreach(path => {
      if (hdfs.getFileStatus(path).isFile())
        println(path)
      else {
        listAllFiles(path.toString())
      }
    })
  }

  /**
   * 打印一级文件名
   */
  def listFiles(path: String) {
    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isFile()).foreach(println)
  }

  /**
   * 打印一级目录名
   */
  def listDirs(path: String) {
    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isDirectory()).foreach(println)
  }

  /**
   * 打印一级文件名和目录名
   */
  def listFilesAndDirs(path: String) {
    getFilesAndDirs(path).foreach(println)
  }

  /**************直接打印************/
  /**************返回数组************/
  def getAllFiles(path: String): ArrayBuffer[Path] = {
    val arr = ArrayBuffer[Path]()
    val hdfs = getHdfs(path)
    val listPath = getFilesAndDirs(path)
    listPath.foreach(path => {
      if (hdfs.getFileStatus(path).isFile()) {
        arr += path
      } else {
        arr ++= getAllFiles(path.toString())
      }
    })
    arr
  }

  def getFiles(path: String): Array[Path] = {
    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isFile())
  }

  def getDirs(path: String): Array[Path] = {
    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isDirectory())
  }

  /**************返回数组************/
}

6. spark 的 wordCount 示例。

package com.xudong

import org.apache.spark.mllib.linalg.{Matrices, Matrix}
import org.apache.spark.{SparkContext, SparkConf}

object TestSparkHdfs {

  def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setAppName("SparkHive").setMaster("local")   //可忽略,已经自动创建了
    val sc=new SparkContext(conf)  //可忽略,已经自动创建了

    val textFile = sc.textFile("hdfs://rebuildb.fdfp75.com:9500/tmp/spark/test/workd.txt");
    val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _);
    counts.saveAsTextFile("hdfs://rebuildb.fdfd75.com:9500/tmp/spark/test/wordcount/output");

  }

}
package com.xudong

import org.apache.spark.mllib.linalg.{Matrices, Matrix}
import org.apache.spark.{SparkContext, SparkConf}

object WordCountLocal {

  def main(args: Array[String]) {

    /**
     * SparkContext 的初始化需要一个SparkConf对象
     * SparkConf包含了Spark集群的配置的各种参数
    */
    val conf = new SparkConf()
      .setMaster("local")         // 启动本地化计算
      .setAppName("testRdd")      // 设置本程序名称

    // Spark程序的编写都是从SparkContext开始的
    val sc = new SparkContext(conf)

    // 以上的语句等价与val sc=new SparkContext("local","testRdd")
    val data = sc.textFile("E:\\4work\\27java\\1_1_Movie_Recommend\\maven_scala_test\\src\\main\\resources\\hello.txt") // 读取本地文件

    data.flatMap(_.split(" "))      // 下划线是占位符,flatMap是对行操作的方法,对读入的数据进行分割
      .map((_, 1))                         // 将每一项转换为key-value,数据是key,value是1
      .reduceByKey(_ + _)                  // 将具有相同key的项相加合并成一个
      .collect()                           // 将分布式的RDD返回一个单机的scala array,在这个数组上运用scala的函数操作,并返回结果到驱动程序
      .foreach(println)                   // 循环打印
  }

}

 

学习交流






用户登录


     QQ登录

我的标签


随机文章

  • 解决Nginx配置负载均衡时invalid host in upstream报错当前平台: windows nginx版本…… 查看详情
  • 红米note3 S线刷MUI版本,解决手机卡顿刷机工具下载完成后安装(如遇到系统安全提示请…… 查看详情
  • Windows下hadoop环境搭建之NameNode启动报错关于hadoop的环境搭建,网上也有各种各样…… 查看详情

友情链接





Copyright© 2016-2021 北桥苏 & 版权所有    赣ICP备16002525号-1
如有侵权、不妥之处,请联系站长的客服并出示版权证明以便删除。敬请谅解! 站长客服联系方式:QQ2652364582