业务:需要从一个数据库查询百万级数据,在java程序中插入到另一个oracle数据库中代码:
private final int persize = 1000;
/**
* 推送数据-流程
* @param tableCode 表名
* @param startTime 开始时间
* @param endTime 结束时间
* @return
*/
public boolean pushFrData(String username,String tableCode,String tableName,String startTime,String endTime){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
System.out.println("导入数据到名录库!");
boolean boo = false;
//表名集合
String [] str = tableCode.split(",");
String [] names = tableName.split(",");
startTime = startTime==""?"2000-01-01":startTime;
endTime = endTime==""?getCurrentDate():endTime;
//System.out.println("service 时间 "+startTime+" > "+endTime);
String start_Time = "to_date('"+startTime+"','%Y-%m-%d')";
String end_Time = "to_date('"+endTime+"','%Y-%m-%d')";
System.out.println("选择推送 "+str.length+" 张表");
//遍历表名集合
for(int i = 0;i<str.length;i++){
System.out.println("所选数据表: "+str[i]);
//字段集合
String [] arr = fillService.getIdenCode(str[i]);
String iden_code = "";
//遍历字段
for(int j = 0;j<arr.length;j++){
iden_code += ""+arr[j]+",";
}
//表字段
iden_code = iden_code.substring(0,iden_code.length()-1);
//System.out.println(str[i]+"总共 "+arr.length+" 个字段!");
//得到推送数据集合
String sql = "select count(*) from "+str[i] +" where s_ext_timestamp >= "+start_Time+" and s_ext_timestamp < "+end_Time;
System.out.println(sql);
int table_size = Integer.valueOf(frDao.query(sql).get(0).toString());
//System.out.println(table_size/persize);
int times = table_size%persize==0?table_size/persize:table_size/persize+1;
for(int t = 1; t <= times;t++){
int start = (t-1) * persize;
List<Object[]> lists = getData(str[i], iden_code,startTime,endTime,start);
//推送数据
System.out.println("准备导出第 "+t+" 批数据");
push(lists,str[i],iden_code);
System.out.println("已导出第 "+t+" 批数据");
}
try{
if(table_size > 0){
addLog(username,str[i].toString(),names[i].toString(),table_size,"1",sdf.parse(startTime),sdf.parse(endTime));
}
}catch (ParseException e){
System.out.println("日期格式转换异常");
}
}
return boo;
}
/**
* 推送数据
* @param lists 数据集
* @param table 表名
* @param iden_code 字段集
*/
private int push(List<Object []> lists,String table,String iden_code){
boolean boo = false;
int count = 0;
//遍历数据结果集
if(lists.size()>0){
for(int k = 0;k < lists.size();k++){
Object [] obj = lists.get(k);
String val = ""; //将数据转换成String类型
for(Object s:obj){
//val += "'"+s.toString()+"',";
if(s != null){
val += "'"+s.toString()+"'|";
}else{
val += "''|";
}
}
val = val.substring(0,val.length()-1);
String etpsid = "";
String [] iden = iden_code.split(",");
String [] value = val.split("\\|");
String val2 = "";//格式化数据(日期)
if(iden.length == value.length){
//格式化sql语句的时间
for(int i = 0;i<iden.length;i++){
//判断字段是否是date类型
if(getDateType(table, iden[i])){
//格式化字符串 防止出现datetime类型 1900-01-01 00:00:00.0的情况
if(value[i].length() > 4){
String vv = value[i].substring(value[i].length()-3,value[i].length());
if(vv.contains(".")){
val2 += "to_date("+value[i].substring(0,value[i].length()-3)+"','YYYY-MM-DD HH24:MI:SS'),";
}else{
val2 += "to_date("+value[i]+",'YYYY-MM-DD HH24:MI:SS'),";
}
}else{
val2 += "'',";
}
}else{
val2 += value[i]+",";
}
if("ETPSID".equals(iden[i])){
etpsid = value[i];
}
}
val2 = val2.substring(0,val2.length()-1);
}else{
/*System.out.println(iden.length+" : "+value.length);
for(int j = 0 ;j< value.length;j++){
System.out.println(value[j]);
}*/
System.out.println("推送数据和字段不一致");
}
String mlk_table = getMlkTable(table);
String mlk_code = getMlkCode(iden_code,table);
//插入数据sql
//String sql = "insert into "+table+"("+iden_code+") values ("+val2+")";
//生成流水号
String uuid = UUID.randomUUID().toString();
uuid = uuid.replace("-","");
val2 += ",'"+uuid+"'";
String sql = "insert into "+mlk_table+"("+mlk_code+") values ("+val2+")";
if(etpsid != ""){
//System.out.println(etpsid);
//工商增量数据按日依"企业唯一标识"为关键字,更新、追加至名录库表中
String s_sql = "select * from "+mlk_table+" where 企业唯一标识 = "+etpsid;
//System.out.println(s_sql);
int s = mlDao.query(s_sql).size();
if (s > 0){
String update_sql = "";
String [] update_code = mlk_code.split(",");
for(int j = 0;j < iden_code.split(",").length;j++){
//判断字段是否是date类型
if(getDateType(table, iden[j])){
//格式化字符串 防止出现datetime类型 1900-01-01 00:00:00.0的情况
if(value[j].length() > 4){
String vv = value[j].substring(value[j].length() - 3, value[j].length());
if(vv.contains(".")){
update_sql += update_code[j]+" = "+ "to_date("+value[j].substring(0, value[j].length() - 3)+"','YYYY-MM-DD HH24:MI:SS'),";
}else{
update_sql += update_code[j]+" = "+ "to_date("+value[j]+",'YYYY-MM-DD HH24:MI:SS'),";
}
}else{
update_sql += update_code[j]+" = "+ "'',";
}
}else{
update_sql += update_code[j]+" = "+ value[j]+",";
}
//update_sql += update_code[j]+" = "+update_val[j]+",";
}
update_sql += "流水号 = '"+uuid+"'";
update_sql = "update "+mlk_table+" set "+update_sql+" where 企业唯一标识 = "+etpsid;
//System.out.println("/////////// /\n"+update_sql+"\n");
sql = update_sql;
System.out.println("更新 "+mlk_table+" 标识 "+etpsid);
}else{
System.out.println("插入 "+mlk_table+" 标识 "+etpsid);
}
}
//System.out.println(sql);
try{
//循环执行sql
mlDao.execute(sql);
count++;
System.out.println(table+" 推送第 "+count+" 条");
}catch (Exception e){
System.out.println("sql执行异常!");
}
}
System.out.println(table+" 表共插入 "+count+" 条数据!");
}else{
System.out.println(table+" 表共插入 "+count+" 条数据!");
System.out.println("导入数据为空!");
}
return count;
}
5 回答
慕工程0101907
TA贡献1887条经验 获得超5个赞
一个优秀的程序员必须从各方面处理问题,以上各位提出的方法都是传统方法。我也一一试过。然而当数据量过于庞大时,这些方式就显得有些力不从心。
我现在所使用的方法是,将处理庞大数据量的执行效率压力放在oracle client,使用sqlloader方式。
如果是两台服务器之间的数据传输或者推送共享,方法如下:
1 在传输端(向另一台服务器推送数据的服务器)安装oracle client(oracle 轻量级客户端而不是服务端)
2 程序生成csv文件和导入需要的ctl,bat文件(如果你是windows操作系统的话)
/**
* 创建ctl文件
* @param path ctl文件根路径
* @param fileName 文件名
* @param csvPath csv文件存放的根路径
* @param insert_table ctl导入的表名
* @param col ctl导入表的列名
* @return ctl绝对路径
* @throws IOException
*/
private String createCtlFile(String path,String fileName,String csvPath,String insert_table,String col) throws IOException {
String files = path+fileName;
File f = new File(path);
if(!f.exists()){ f.mkdirs();}
if(new File(files).exists()){
new File(files).delete();
}
FileWriter fileWriter = new FileWriter(files);
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("load data\n");
stringBuffer.append("infile '"+csvPath+"'\n");
stringBuffer.append("into table \""+insert_table+"\" append\n");
stringBuffer.append("fields terminated by ',' optionally enclosed by'\"'\n");
stringBuffer.append("TRAILING NULLCOLS\n");
stringBuffer.append(col);
String data = stringBuffer.toString();
fileWriter.write(data);
fileWriter.close();
System.out.println("创建 "+path+fileName+" 文件成功!");
return path+fileName;
}
/**
* 创建 bat文件执行导出命令
* @param path
* @param fileName
* @param ctlPath
* @return
* @throws IOException
*/
private String createBatFile(String path,String fileName,String ctlPath) throws IOException {
String mlk_sid = "admin/123456@ORCL"; //数据库连接用户名/密码@sid
String files = path+fileName;
File f = new File(path);
if(!f.exists()){ f.mkdirs();}
if(new File(files).exists()){
new File(files).delete();
}
FileWriter fileWriter = new FileWriter(files);
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("sqlldr "+mlk_sid+" control="+ctlPath);
String data = stringBuffer.toString();
fileWriter.write(data);
fileWriter.close();
System.out.println("创建 "+path+fileName+" 文件成功!");
return path+fileName;
}
3 程序调用bat文件让oracle client执行ctl文件,oracle会自己将数据传输过去
以下所贴为程序部分代码,保密需要不全请谅解
//创建ctl和bat文件 str[i] 为程序外层for循环遍历的表名 col为根据表名得到的字段名称
String ctl_fileName = file_time+mlk_table+".ctl";
String ctl_rootPath = createCtlFile(ctl_path,ctl_fileName,path+fileName,str[i],col);
String bat_fileName = file_time+mlk_table+".bat";
String bat_rootPath = createBatFile(ctl_path,bat_fileName,ctl_rootPath);
//系统桌面调用执行bat文件
Desktop.getDesktop().open(new File(bat_rootPath));
// 判断何时推送数据 (名录数据库中csv导入完成)
String temp_count = "select count(*) from "+str[i];
while (Integer.valueOf(mlDao.query(temp_count).get(0).toString()) != table_size){
System.out.println("csv导入"+str[i]+"中......");
}
System.out.println("csv已导入成功!");
莫回无
TA贡献1865条经验 获得超7个赞
建议使用JDBC的批处理模式,搜索以下关键字:
addBatch(String query)
executeBatch()
建议1000条左右作为一个batch提交事务。
添加回答
举报
0/150
提交
取消