添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中。

package com.enn.idcard;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 * <p>Description: </p>
 * @author kangkaia
 * @date 2017年12月26日 下午1:42:24
public class HiveJdbc {
    public static void main(String[] args) throws IOException {
    	List<List> argList = new ArrayList<List>();
		List<String> arg = new ArrayList<String>();
		arg.add("12345");
		arg.add("m");
		argList.add(arg);
		arg = new ArrayList<String>();
		arg.add("54321");
		arg.add("f");
		argList.add(arg);
//		System.out.println(argList.toString());
		String dst = "/test/kk.txt";
		createFile(dst,argList);
		loadData2Hive(dst);
     * 将数据插入hdfs中,用于load到hive表中,默认分隔符是"\001"
     * @param dst
     * @param contents
     * @throws IOException
    public static void createFile(String dst , List<List> argList) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path dstPath = new Path(dst); //目标路径
        //打开一个输出流
        FSDataOutputStream outputStream = fs.create(dstPath);
        StringBuffer sb = new StringBuffer();
        for(List<String> arg:argList){
			for(String value:arg){
				sb.append(value).append("\001");
			sb.deleteCharAt(sb.length() - 4);//去掉最后一个分隔符
			sb.append("\n");
        sb.deleteCharAt(sb.length() - 2);//去掉最后一个换行符
        byte[] contents =  sb.toString().getBytes();
        outputStream.write(contents);
        outputStream.close();
        fs.close();
        System.out.println("文件创建成功!");
     * 将HDFS文件load到hive表中
     * @param dst
    public static void loadData2Hive(String dst) {
    	String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
    	String CONNECTION_URL = "jdbc:hive2://server-13:10000/default;auth=noSasl";
    	String username = "admin";
        String password = "admin";
        Connection con = null;
		try {
			Class.forName(JDBC_DRIVER);
			con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
			Statement stmt = con.createStatement();
			String sql = " load data inpath '"+dst+"' into table population.population_information ";
			stmt.execute(sql);
			System.out.println("loadData到Hive表成功!");
		} catch (SQLException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}finally {
			// 关闭rs、ps和con
			if(con != null){
				try {
					con.close();
				} catch (SQLException e) {
					e.printStackTrace();

注意:本例使用mvn搭建,conf配置文件放在src/main/resources目录下。

Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。

在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。一般而言,对于OLTP而言,行表优势大于列表,对于OLAP而言,列表的优势大于行表,特别容易想到当做聚合操作时,列表的复杂度将会比行表小的多,虽然单独rcfile的列运算不一定总是存在的,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。

通过hdfs导入hive的表默认是textfile格式的,因此可以改变存储格式,具体方法是先创建sequencefile、rcfile等格式的空表,然后重新插入数据即可。

insert overwrite table seqfile_table select * from textfile_table; 
insert overwrite table rcfile_table select * from textfile_table;
Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中。package com.enn.idcard;import java.io.IOException;import java.sql.Connection;import
1.一般导入分区数据用: INSERT OVERWRITE TABLE target_table PARTITION (store_day=20200303) SELECT column1,column2 FROM source_table WHERE store_day=20200303; 但如果有很多个分区,逐个导的话很麻烦,而且每个分区一个job,要执行很多个jpb,效率很低。 2.批量...
使用JDBC连接MySQL数据库进行数据插入的时候,特别是大批量数据连续插入(10W+),如何提高效率呢? 在JDBC编程接口中Statement 有两个方法特别值得注意: void addBatch() throws SQLException Adds a set of parameters to this PreparedStatement object’s batch of commands. int[] executeBatch() throws SQLException Submits a batch of commands to the database for exec
1.批量执行SQL语句 当需要成批插入或者更新记录时,可以采用Java批量更新机制,这一机制允许多条语句一次性提交给数据批量处理。通常情况下比单独提交处理更有效率 JDBC批量处理语句包括下面三个方法: addBatch(String):添加需要批量处理的SQL语句或是参数; executeBatch():执行批量处理语句; clearBatch():清空缓存的数据 通常我们会遇到两种批量执行SQL语句的情况: 多条SQL语句的批量处理; 一个SQL语句的批量传参; 2 .高效的批量插入 hive是facebook开源,并捐献给了apache组织,作为apache组织的顶级项目(hive.apache.org)。 hive是一个基于大数据技术的数据仓库(DataWareHouse)技术,主要是通过将用户书写的SQL语句翻译成MapReduce代码,然后发布任务给MR框架执行,完成SQL 到 MapReduce的转换。可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。 Hive是一个数据仓库 Hive构建在HDFS上,可以存储海量数据。 3. 从Hive中读取数据,可以使用Spark SQL或DataFrame API。 4. 将读取到的数据转换为HBase中的数据格式,例如使用HBase API中的Put类。 5. 将转换后的数据写入HBase中,可以使用HBase API中的Table类。 6. 关闭SparkSession对象和HBase连接。 需要注意的是,导入HBase的数据需要根据HBase表的结构进行转换,例如将Hive表中的列映射到HBase表中的列族和列。同时,需要根据实际情况设置HBase的配置参数,例如Zookeeper的地址和端口等。 ### 回答2: 要将Hive数据批量导入HBase,需要使用Scala编写Spark程序。具体步骤如下: 1. 配置HBase、Hive和Spark的环境。在集群上安装好HBase、Hive和Spark,并确保它们可以正常运行。 2. 创建一个Scala项目,并将所需的依赖项添加到项目中。这些依赖项包括:HBase的Java API、Spark的Core APIHiveJDBC驱动程序。可以在构建管理工具中声明这些依赖项,如SBT或Maven。 3. 编写Spark程序。程序主要分为以下几个步骤: a. 从Hive表中读取数据。可以使用HiveJDBC驱动程序连接到Hive,并执行SQL查询语句来读取数据。 b. 将数据转换为HBase Put对象。根据HBase的数据模型,需要将每条数据转换为HBase的Put对象,包括Put对象的行键、列族、列名和值。 c. 将Put对象保存到HBase中。使用HBase的Java API将转换后的Put对象批量保存到HBase中。 4. 测试程序。可以在本地模式下运行程序,或者将程序部署到生产环境中进行测试。 5. 部署程序。将打包好的程序部署到Spark集群中,提交作业并监控作业的执行情况。 总之,将Hive数据批量导入HBase需要使用Scala编写Spark程序,并确保环境配置正确、依赖项已添加、程序编写正确、测试通过和部署正常。这项工作比较复杂,需要对HBase、Hive和Spark有一定的了解和经验。 ### 回答3: Scala版本,Spark将Hive数据批量导入到HBase,可以通过以下步骤实现。 1. 导入Hive表:首先需要在Hive中创建表,并导入需要导入到HBase的数据。可以使用以下命令创建Hive表: CREATE EXTERNAL TABLE hive_table (key int, value string) STORED AS TEXTFILE LOCATION '/path/to/hive_table'; 2. 导入到Spark:使用Spark SQL将Hive导入到Spark中。可以使用以下代码: val spark = SparkSession.builder().appName("HiveToHBase").enableHiveSupport().getOrCreate() val df = spark.sql("select * from hive_table") 3. 将数据转换成HBase的格式:将Spark数据转换成HBase的格式,并指定列族名。可以使用以下代码: import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes val columnFamily = Bytes.toBytes("cf") val putRdd = df.rdd.map{row => val key = row.getInt(0) val value = row.getString(1) val put = new Put(Bytes.toBytes(key)) put.addColumn(columnFamily, Bytes.toBytes("col"), Bytes.toBytes(value)) (key, put) 4. 保存到HBase:最后,将数据保存到HBase中。可以使用以下代码: val tableName = "hbase_table" val config = HBaseConfiguration.create() config.set("hbase.zookeeper.quorum", "localhost") config.set("hbase.zookeeper.property.clientPort", "2181") val connection = ConnectionFactory.createConnection(config) val table = connection.getTable(TableName.valueOf(tableName)) putRdd.map{case (_, put) => put}.saveAsNewAPIHadoopDataset(createHadoopConf(tableName, config)) 5. 完成操作:完成所有操作后,关闭连接和SparkSession。可以使用以下代码: table.close() connection.close() spark.stop() 以上就是使用Scala版本的Spark将Hive数据批量导入到HBase的详细步骤。需要注意的是,在实际操作中需要根据具体情况进行适当调整。