hadoop集群同步實現

2021-07-30 12:01:28 字數 4722 閱讀 7143

#!/usr/bin/env python

#coding=utf-8

#scribe日誌接收存在小集群到大集群之間, distcp 同步失敗的情況,需要手動進行補入。

#1、如果查詢補入的日誌量少,則可以之間用指令碼處理。如果量大,則使用 hadoop 提交job。

# hadoop job 提交方式:

#   hadoop jar /usr/local/hadoop-2.4.0/share/hadoop/tools/lib/hadoop-distcp-2.4.0.jar -m 100 hdfs://scribehadoop/scribelog/common_act/2016/08/02/13/  /file/realtime/distcpv2/scribelog/common_act/2016/08/02/13  --update 

#  --update 引數表示如果目標位址目錄存在,則更新該目錄中的內容。

#手動同步指令碼使用方法: python manual_check_sync.py  dst_path  

#指令碼完成大集群和小集群之間的目錄大小比較,目錄檔案比較。 輸出差異檔案列表。最後完成同步入庫。

import sys,os,commands,re

import logging,logging.handlers 

module=sys.argv[1].split("scribelog")[1]

logger1 = logging.getlogger('mylogger1')

logger1.setlevel(logging.debug)

# 建立乙個handler,用於寫入日誌檔案

filehandle = logging.filehandler('log/test.log',mode='a')

# 再建立乙個handler,用於輸出到控制台

consehandle = logging.streamhandler()

# 定義handler的輸出格式formatter

formatter = logging.formatter('[%(asctime)s]:%(message)s',datefmt='%f %t')

#formatter = logging.formatter('[%(asctime)s-line:%(lineno)d-%(levelname)s]:%(message)s',datefmt='%f %t')

filehandle.setformatter(formatter)

consehandle.setformatter(formatter)

logger1.addhandler(filehandle)

logger1.addhandler(consehandle)

little_cluster="hdfs:"

large_cluster="hdfs://ns1"

#logger1.info("源目錄(小集群):%s%s" % ( little_cluster,module))

logger1.info("源目錄(小集群):%s%s" %(little_cluster,module))

logger1.info("目標目錄(大集群):%s%s" %(large_cluster,sys.argv[1]))

#統計目錄大小等情況

du_little=commands.getoutput("hadoop fs -count " + little_cluster + module)

du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv[1])

#獲取的值是str型別,所以需要轉為list來做比較。

logger1.info("                dir_count filecount contentsize ")

logger1.info("小集群目錄資訊:%s" %(du_little))

logger1.info("大集群目錄資訊:%s" %(du_large))

#python的str類有split方法,只能根據指定的某個字元分隔字串,re模組中提供的split方法可以定義多個分隔符。這裡可以用單個空格作為分隔即可。

du_little=re.split('         |           | ' ,du_little)

du_large=re.split('         |           | ' ,du_large)

#du_little=du_little.split(" ")

#du_large=du_large.split(" ")

#print du_large,du_little

#print du_large[3],du_little[3]

#print du_large[6],du_little[6]

#print du_large[7],du_little[7]

#if du_little[3] == du_large[3]  and du_little[6] == du_large[6] and du_little[7] == du_large[7]:

logger1.info("大小集群檔案數量、大小一致,不需要同步")

exit()

#如果大小不一致,取出目錄下所有檔案

little_list=commands.getoutput("hadoop fs -lsr " + little_cluster + module + "|grep -v \"^d\"" )

large_list =commands.getoutput("hadoop fs -lsr " + large_cluster + sys.argv[1]+"|grep -v \"^d\"")

#logger1.info( "小集群情況:%s " %(little_list))

#logger1.info( "大集群情況:%s ")%(large_list ))

list1=

list2=

lost_list=

for i in  little_list.split("\n"):

for i in  large_list.split("\n"):

#logger1.info("小集群目錄檔案:",list1

#logger1.info("大集群目錄檔案" ,list2

logger1.info("對比大小集群檔案---》未同步檔案列表:")

for i in list1:

if i not in list2:

logger1.info(i) 

logger1.info(lost_list)

#拉取小集群檔案到本地tmp目錄中

for i in lost_list:

s=commands.getstatusoutput("hadoop fs -get " + little_cluster + i+" /tmp/")

logger1.info("拉取小集群檔案到本地tmp目錄中%s" %s )

#入庫到大集群

for i in lost_list:

logger1.info(i)

j=commands.getstatusoutput("sudo su - datacopy -s /bin/bash -c '/usr/local/hadoop-2.4.0/bin/hadoop fs -put  /tmp/" +i.split("/")[-1] + "  /file/realtime/distcpv2/scribelog"+i+" '" )

logger1.info(j)

if j[0] == 0 :

logger1.info(" %s + 同步完成" %(i))

#clear tmp file

for i  in lost_list:

commands.getstatusoutput("rm -f /tmp/"+i.split("/")[-1] )

#當同步目錄時,可能會出現put的時候提示沒有目錄存在。參考趙兵的 put  -  src dest  方法,是否可以避免此類問題有待驗證。

du_little=commands.getoutput("hadoop fs -count " + little_cluster + module)

du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv[1])

logger1.info("                  dir_count       filecount       contentsize ")

logger1.info("小集群目錄資訊:%s" %(du_little))

logger1.info("大集群目錄資訊:%s" %(du_large))

#python的str類有split方法,但是這個split方法只能根據指定的某個字元分隔字串,re模組中提供的split方法可以用來做這件事情

du_little=re.split('         |           | ' ,du_little)

du_large=re.split('         |           | ' ,du_large)

if du_little[3] == du_large[3]  and du_little[6] == du_large[6] and du_little[7] == du_large[7]:

logger1.info("小集群檔案數量、大小一致,不需要同步")

exit()

原文:樂搏學院

hadoop集群時間同步

時間同步的方式 找乙個機器,作為時間伺服器,所有的機器與這台集群時間進行定時同步,比如,每隔十分鐘,同步一次時間。1 檢查 ntp 是否安裝 rpm qa grep ntp 2 修改 ntp 配置檔案 vi etc ntp.conf修改內容如下 a 修改 1 授權 192.168.134.0 192...

Hadoop集群時間同步

sudo apt get install ntp參考配置ntp服務ntpd ntp.conf 搭建hadoop集群可參考 vim etc ntp.conf 配置客戶端的授權,也就是給指定的機器 客戶端 設定訪問ntp server的許可權,這是通過restrict配置項實現的 local netwo...

集群時間同步 用於hadoop集群

集群時間同步 用於hadoop集群 時間同步方式 找乙個機器,作為時間伺服器,所有機器與這條集群時間進行定時的同步,比如每隔10分鐘,同步一次時間 配置時間同步具體實操 時間伺服器配置 必須是root使用者 檢查ntp是否安裝 rpm qa grep ntp 修改ntp配置檔案 vim etc ne...