mongodb拆库分表脚本
脚本功能:
1. 将指定的报告文件按照指定的字段、切库切表策略切分
2. 将切分后的文件并发导入到对应的Mongodb中
3. 生成日志文件和done标识文件
使用手册:
-h 打印帮助信息,并退出";
-f 需要切分的数据文件";
-g 清理昨日或历史全部数据: 1 昨日数据 2 历史全部数据";
-k 拆分字段在文件中列数,从1开始";
-o 需要切分的数据文件格式 tsv或csv ";
-d 切分的库数目";
-t 切分的表数目";
-m 切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_01";
-c 切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_0102";
-a 入库fieldFile";
-p 配置文件",
使用步骤:
1. 在配置文件中设置日志、切割后数据临时路径$LOG_HOME 和 $DATA_SPLIT_HOME目录,如果不存在,则手动创建;
在配置文件中设置目标Mongodb参数信息,用来作为导入数据的目标库;
在配置文件中设置Mongodb程序的主目录$MONGO;
2. 按照具体的参数意义,仿照下面的格式执行脚本:
举例:./mongo-split-importer.sh -f /data/shell/test.ata -g 1 -o tsv -k 3 -d 3 -t 3 -m idea -c idea -p ../conf/demeter_conf_qa.sh -a ../conf/idea-head-file
-f 切分目标文件 -o 文件格式 tsv -k 切割字段,第三个 -d 切割成3个库 -t 每个库3个表
-m 导入的mongodb未拆分名称idea -c 导入的mongodb未拆分表名idea -p 环境配置文件 -a 导入目标表的fieldFile文件 -g 清理昨日数据
mongo-split-importer.sh执行脚本:
#!/bin/bash
SPLITFILE="" #目标切割文件
FILEFORMAT="" # 目标切割文件格式 , \t
FILEFORMATNAME="" #切割目标文件格式名称 csv tsv
SPLITKEY=1
SPLITDBNUM="" #目标切割库数目
SPLITTBNUM="" #目标切割表数目
IMPORTDBNAME="" # 目标入库未分割库名
IMPORTTBNAME="" #目标入库未切割表名
PROFILE="" #配置文件
FIELDFILE="" #入库fieldFile
CLEAN=0 #清理数据, 0:默认不清理, 1 : 清理昨日的数据 2: 清理所有以前的数据
SPILTTMPDIR="" #目标切割文件存放临时目录
FULLPATH=$(cd `dirname $0`;pwd -P)
SCRIPTFILE=`basename $0`
TOTLE_RECORD_NUM=0 #文件切割前的记录条目
SUBFILE_RECORD_NUM=0 #切割后所有文件汇总的记录条目
_mongo_count="-1"
#------------------------------------------------函数---------------------------------------------------------------
function usage(){
echo "$SCRIPTFILE - 分库分表后将数据导数据到mongodb"
echo "SYNOPSIS"
echo "OPTIONS"
echo " -h 打印帮助信息,并退出";
echo " -f 需要切分的数据文件";
echo " -g 是否清理历史数据,默认不清理 1:清理昨日数据 2:清理以前所有数据";
echo " -k 拆分字段在文件中列数,从1开始";
echo " -o 需要切分的数据文件格式 tsv或csv ";
echo " -d 切分的库数目";
echo " -t 切分的表数目";
echo " -m 切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_01";
echo " -c 切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_0102";
echo " -a 入库fieldFile";
echo " -p 配置文件,绝对或相对路径文件",
exit
}
function setFileFormat(){
FILEFORMATNAME=$1
case $1
in
csv) FILEFORMAT=",";;
tsv) FILEFORMAT="\t";;
*) echo "unknow profile -o $1"; usage;;
esac
}
while getopts ':hf:g:o:k:d:t:a:p:m:c:' OPTION
do
case $OPTION
in
h) usage;;
f) SPLITFILE=$OPTARG;;
g)CLEAN=$OPTARG;;
o) setFileFormat $OPTARG;;
k) SPLITKEY=$OPTARG;;
d) SPLITDBNUM=$OPTARG;;
t) SPLITTBNUM=$OPTARG;;
a) FIELDFILE=$OPTARG;;
p) PROFILE=$OPTARG;;
m) IMPORTDBNAME=$OPTARG;;
c) IMPORTTBNAME=$OPTARG;;
:) echo "选项 \"-$OPTARG\" 后面缺少对应值, 将使用默认值";;
\?)echo " 错误的选项 -$OPTARG, 将退出"; usage;;
esac
done
#记录日志信息
function logInfo(){
echo "[`date +"%Y-%m-%d %H:%M:%S"`] $@ " | tee -a $LOGFILE
}
function checkError(){
if [ $? -ne 0 ]; then
echo "[`date +"%Y-%m-%d %H:%M:%S,%s"`][$SCRIPTFILE, $$] ERROR OCCURS! - $1" | tee -a $ERRORFILE
exit 1;
fi
}
function check_ready() {
tmp_done_file=`printf "$reportDoneFile" "$TABLE" "$1"`
while [ "$isok" = "false" ]; do
rsync --list-only ${tmp_done_file}
if [ $? -eq 0 ]; then
isok="true";
break;
fi
if [ "$isok" = "false" ]; then
sleep 300
fi
time_now=`date +%s`
if [ `expr ${time_now} - ${time_start}` -ge $max_interval ]; then
return 255;
fi
done
return 0;
}
#从数据库列表里选择主库
function selectMongoMaster(){
tmp="TARGET_MONGO_HOST_LIST_0$1"
TMP_HOST=${!tmp}
echo $TMP_HOST
#replica set
for DUBHE_MONGO_HOST in $TMP_HOST; do
if [ $? -eq 0 ] ; then
break;
fi
done
# single server
#for DUBHE_MONGO_HOST in $TMP_HOST; do
#TARGET_MONGO_HOST=$DUBHE_MONGO_HOST
#echo $TARGET_MONGO_HOST
#done
}
#切割
function split() {
logInfo "spilt data file"
echo "split db num"$SPLITDBNUM
echo "split tb num"$SPLITTBNUM
echo "Start to split file: "$SPLITFILE
awk '
BEGIN {
FS="'${FILEFORMAT}'";
}
ARGIND==1{
#分库分表
DBN=$'${SPLITKEY}' % '${SPLITDBNUM}' + 1;
TBN=int($'${SPLITKEY}' / '${SPLITDBNUM}')
TBN=TBN % '${SPLITTBNUM}' + 1;
DBN="0"DBN;
TBN="0"TBN;
print $0 > "'${SPILTTMPDIR}'""/""'${IMPORTTBNAME}'""_"DBN""TBN
}
END {
}
' ${SPLITFILE};
ls $SPILTTMPDIR
echo "Split file successfully : "$SPLITFILE
}
#导入
function import() {
#importData
local iter=1;
while [ $iter -le $SPLITDBNUM ]; do
thread_import $iter &
iter=`expr $iter + 1`
done
#wait for child-threads
wait;
}
#导入子线程
function thread_import() {
local num=1;
targetFileName=$IMPORTTBNAME"_0"$1"0"$num
targetFile=$SPILTTMPDIR/$IMPORTTBNAME"_0"$1"0"$num
targetDB=$IMPORTDBNAME"_0"$1
targetCollection=$IMPORTTBNAME"_0"$1"0"$num
if [ ! -f $targetFile ]; then
logInfo "spilt file does not exits : " $targetFile
num=`expr $num + 1`
continue
fi
user="TARGET_MONGO_USER_0"$1
TMP_USER=${!user}
password="TARGET_MONGO_PWD_0"$1
TMP_PASSWORD=${!password}
#选择master
selectMongoMaster $1;
#clean dirty data
if [ $CLEAN -gt 0 ]; then
logInfo "$qdate $targetDB.$targetCollection cleaning up dirty data in mongodb"
clean_dirty_data
checkError "whether error occurs during cleaning dirty data from mongodb"
fi
#import data
import2mongo $1 $targetFile $targetDB $targetCollection
#record done file
statusfile="$STATUS_LOG_HOME/$targetFileName.done.`date -d $qdate +"%Y-%m-%d"`"
touch $statusfile
num=`expr $num + 1`
done
logInfo "thread $1 ends"
}
#把指定的文件导到指定的库指定的表,并建立索引,mongodb自身会判断索引是否存在
#不存在的情况下才创建新索引
function import2mongo(){
if [ "$FIELDFILE" != "" ]; then
MONGO_FIELD_FILE=$FIELDFILE
else
MONGO_FIELD_FILE=$FULLPATH/../conf/${IMPORTTBNAME}-head-file
fi
DATAFILE=$2
if [ ! -f $DATAFILE ]; then
logInfo "mongodb [${DB}.${COLL}] imported 0 objects"
return 0
fi
TMPLOGFILE=$INFO_LOG_HOME/$DB.$COLL.tmp.log
tmp=$?
if [ "$tmp" != "0" ]; then
return $tmp
fi
#data check
_mongo_count=`tail $TMPLOGFILE | grep imported`
_mongo_count=`expr 0$_mongo_count + 0`
#start to ensure index
ensureIndex
logInfo "mongodb [${DB}.${COLL}] imported $_mongo_count objects"
return $tmp
}
function ensureIndex(){
}
#垃圾数据清理
function clean_dirty_data(){
day=`date -d ${1:-' -1day'} +"%y%m%d"`
if [ $CLEAN -eq 1 ]; then
_mongo_condition="{\"_id\":{\"\$gte\":\"${day}_0\",\"\$lte\":\"${day}_9\"}}"
else
_mongo_condition="{\"_id\":{\"\$lte\":\"${day}_9\"}}"
fi
logInfo "waiting for the clean task.."
echo $_mongo_condition
tmp=$?
if [ "$tmp" != "0" ]; then
return $tmp
fi
sleep 5s
logInfo "dirty data cleaned: "$targetDB $targetCollection $dirtyCount
echo "dirty data cleaned: "$targetDB $targetCollection $dirtyCount
return $tmp
}
#parameter check
function checkParams() {
if [ 1 -ne $CLEAN -a 2 -ne $CLEAN ]; then
logInfo "-g the parameter clean is not in [1, 2] : "$CLEAN
return 1;
fi
if [ $FILEFORMAT != "," -a $FILEFORMAT != "\t" ]; then
logInfo "-o the parameter file format is not in [csv, tsv] : "$FILEFORMAT
return 1;
fi
if [ $SPLITKEY -lt 1 ]; then
logInfo "-k split key must not be less than 1 : "$SPLITKEY
return 1;
fi
if [ $SPLITDBNUM -lt 1 ]; then
logInfo "-d database number must not be less than 1 : "$SPLITDBNUM
return 1;
fi
if [ $SPLITTBNUM -lt 1 ]; then
logInfo "-t collection number must not be less than 1 : "$SPLITTBNUM
return 1;
fi
if [ ! -f $FIELDFILE ]; then
logInfo "-a field file is not a common file or not exits : "$FIELDFILE
return 1;
fi
if [ "" = $IMPORTDBNAME ] ; then
logInfo "-m import database name is empty : "$IMPORTDBNAME
return 1;
fi
if [ "" = $IMPORTTBNAME ] ; then
logInfo "-m import table name is empty : "$IMPORTTBNAME
return 1;
fi
}
#主函数
function main() {
set +x
echo "check split file and profile: " $SPLITFILE $PROFILE
if [ ! -f $SPLITFILE ]; then
echo "-f split file is not a common file or not exits : "$SPLITFILE
return 1;
fi
if [ ! -f $PROFILE ]; then
echo "-p profile file is not a common file or not exits : "$PROFILE
return 1;
fi
source $PROFILE
qdate=`date +"%Y-%m-%d"`
last_day=`date -d "-1day" +"%Y-%m-%d"`
BASEFILENAME=$(basename $SPLITFILE)
echo "base split file name is : "$BASEFILENAME
if [ ! -d $LOG_HOME ] ; then
logInfo " log home is not a common directory or not exits : "$LOG_HOME
return 1;
fi
LOGFILE=$INFO_LOG_HOME/$BASEFILENAME.$qdate.log
if [ -f $LOGFILE ]; then
mv $LOGFILE $LOGFILE.$last_day
fi
touch $LOGFILE
ERRORFILE=$ERROR_LOG_HOME/$BASEFILENAME.error.log
if [ -f $ERRORFILE ]; then
mv $ERRORFILE $ERRORFILE.$last_day
fi
touch $ERRORFILE
#空行
echo
echo
logInfo "start to check parameters!"
checkParams
checkError "whether error occurs during check parameters : $SPLITFILE"
#空行
echo
echo
logInfo "start to split file: "$SPLITFILE
if [ ! -d $DATA_SPLIT_HOME ] ; then
logInfo " data split home is not a common directory or not exits : "$DATA_SPLIT_HOME
return 1;
fi
SPILTTMPDIR=$DATA_SPLIT_HOME/$BASEFILENAME
echo "split temple directory : "$SPILTTMPDIR
if [ -d ${SPILTTMPDIR} ]; then
rm -rf ${SPILTTMPDIR}
fi
mkdir -p ${SPILTTMPDIR}
split
checkError "whether error occurs during split data : $SPLITFILE"
logInfo "split data completely : $SPLITFILE"
statusfile=$STATUS_LOG_HOME/$BASEFILENAME".split.done."$qdate
touch ${statusfile}
#空行
echo
echo
logInfo "start to import split file to mongodb"
import
logInfo "import data completely : $SPLITFILE"
statusfile=$STATUS_LOG_HOME/$BASEFILENAME".import.done."$qdate
touch ${statusfile}
#空行
echo
echo
#remove temple directory
# if [ -d ${SPILTTMPDIR} ]; then
# rm -rf ${SPILTTMPDIR}
# fi
}
#-------------------------------------------------入口----------------------------------------------------------------
source /etc/profile
demeter_conf_cpc_qa.sh 脚本:
#!/bin/bash
source /etc/profile
#logger path
INFO_LOG_HOME="${LOG_HOME}/info"
STATUS_LOG_HOME="${LOG_HOME}/status"
if [ ! -d $ERROR_LOG_HOME ]; then
if [ ! -d $INFO_LOG_HOME ]; then
mkdir -p $INFO_LOG_HOME
fi
if [ ! -d $STATUS_LOG_HOME ]; then
mkdir -p $STATUS_LOG_HOME
fi
if [ ! -d $DATA_HOME ]; then
mkdir -p $DATA_HOME
fi
#data path for source and target data path
DATA_SPLIT_HOME=/data/demeter/sdata
#import target mongodbs
TARGET_MONGO_PORT_01=XXX
TARGET_MONGO_USER_01=XXX
TARGET_MONGO_PWD_01=XXX
TARGET_MONGO_HOST_LIST_01="test01.mongodb01:$TARGET_MONGO_PORT_01 test01.mongodb02:$TARGET_MONGO_PORT_01 test01.mongodb03:$
TARGET_MONGO_PORT_01"
TARGET_MONGO_PORT_02=XXX
TARGET_MONGO_USER_02=XXX
TARGET_MONGO_PWD_02=XXX
TARGET_MONGO_HOST_LIST_02="testt02.mongodb01:$TARGET_MONGO_PORT_02 test02.mongodb02:$TARGET_MONGO_PORT_02 test02.mongodb03:$
TARGET_MONGO_PORT_02"
TARGET_MONGO_PORT_03=XXX
TARGET_MONGO_USER_03=XXX
TARGET_MONGO_PWD_03=XXX
TARGET_MONGO_HOST_LIST_03="test03.mongodb01:$TARGET_MONGO_PORT_03 test03.mongodb02:$TARGET_MONGO_PORT_03 test03.mongodb03:$
TARGET_MONGO_PORT_03"
#mongodb utils
MONGO=/opt/mongodb
xuri-cpc-head-file
a
b
c
d
e
f
g
h
i
j
k
l
m
n
0
p
q
r
s
host:
XX.XX.XX.XX test01.mongodb01
XX.XX.XX.XX test01.mongodb02
XX.XX.XX.XX testt01.mongodb03
XX.XX.XX.XX test02.mongodb01
XX.XX.XX.XX test02.mongodb02
XX.XX.XX.XX test02.mongodb03
XX.XX.XX.XX test03.mongodb01
XX.XX.XX.XX test03.mongodb02
XX.XX.XX.XX test03.mongodb03
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341