業務効率化ツールについて
本記事ではデータファイル処理の自動化を行うためのpythonプログラムです。
自分で書いたコードの備忘録を兼ねてコードの記載内容のポイントをまとめた。
python実行環境
環境情報
- windows10 64bit (メモリ、CPUはDaaS環境のため不明)
- python ver 3.10.4
環境構築については以下記事を参考にしてください。
ソースコード 全体
import csv
import os
import glob #globライブラリを使う場合にインポートする ファイルパスを取得する用途
import datetime #現在日時を扱うためのライブラリ 読み込むCSVファイルの選定に利用する
import shutil #ファイル・ディレクトリ移動のための標準ライブラリ
from tkinter import messagebox #python標準のGUIアプリ作成モジュールの利用宣言
import sys
###例外処理部###
def error(Error_ID):
messagebox.showerror('プログラムエラー','エラーコード:'+str(Error_ID)) #関数の呼び出し時の引数に入ったError_IDを出力
sys.exit()
path = os.getcwd() #現在のカレントディレクトリを取得しpath変数へ格納する
print(path)
path = "C:/Users/2465732/AppData/input".replace('/',os.sep) #path変数へ操作したいcsvファイルがある絶対パスを格納する
print(path) #↑で指定したパスをデバック用に表示させる
os.chdir(path) #↑で指定したパスにカレントディレクトリを変更する
print('変更後のディレクトリ:%s' %path) #デバック用ディレクトリが変更されたことを確認する
##inputディレクトリに入っているcsvファイル名を読み込み、本日の日付YYYY-MM-DDの文字列が含まれるCSVファイルを特定する
d_today = datetime.date.today() #今日の日付を取得する
if len(str(d_today)) == 0: #d_todayの中身が空/NULLの時
error('001') #例外処理へ
name_list =glob.glob('*.csv') #.csvファイルの一覧を取得してname_listへ格納
if len(name_list) == 0: #name_listの中身が空/NULLの時
error('002') #例外処理へ
name_list_counter=0
counter_d=0 #今日の日付が着いたCSVファイルのファイル名があるname_listの配列番号を入れる
jg_cnt = 0 #name_listの文字列にd_todayの文字列が1つも含まれない or 2つ以上ある場合の判定識別子
for f in name_list:#name_listの配列数分for分を回す
if str(d_today) in name_list[name_list_counter]:#今日の日付とname_listのファイル名の比較
counter_d=name_list_counter #一致した場合はcounter_dに配列番号を記憶
jg_cnt = jg_cnt + 1
name_list_counter = name_list_counter +1
print("jg_cntの値:%d" %jg_cnt)
if jg_cnt == 0: #name_listの文字列にd_todayの文字列が1つも含まれない場合
error('011') #例外処理へ
elif jg_cnt >= 2: #name_listの文字列にd_todayの文字列が2つ以上含まれる場合
error('012') #例外処理へ
print("本日の日付をファイル名に持つcsvファイルは:name_list[%d] = %sです"%(counter_d,name_list[counter_d])) #デバック用
##csvファイルを配列へ格納する
#Bridge_Domain = [[0 for i in range(2)]for j in range(total_lines)]
host_name =[] #CSVの1列目のHOSTNAMEを格納する用
Bridge_Domain = [] #CSVの3列目のBridge Domainを格納する用
##1回目のファイルロード時は必要なパラメータを変数へ代入する
filename = name_list[counter_d] #開くcsvファイル名を入れる
try:
with open(filename,encoding='utf8',newline='')as f: #./test.csvというファイルを移行"f"というオブジェクトに置き換えて利用
csvreader = csv.reader(f)
counter = 0 #csvファイルの読み出し行数をカウントしておくための変数
for row in csvreader:
# Bridge_Domain[counter][1] = row[1]
# Bridge_Domain[counter][2] = row[2]
# print('Bridge_domain[%d][1]= %s' %(counter,Bridge_Domain[counter][1]))
# print('Bridge_domain[%d][2]= %s' %(counter,Bridge_Domain[counter][2]))
# print('row[1]=%s'%row[1])
host_name.append(row[0])
Bridge_Domain.append(row[2])
print('host_name[%d]=%s'%(counter,host_name[counter]))
print('Bridge_Domain[%d]=%s'%(counter,Bridge_Domain[counter]))
counter = counter +1
# print(counter)
except FileNotFoundError as e:
print(e)
error('021') #例外処理へ
except csv.Error as e:
print(e)
error('022') #例外処理へ
##2回目のファイルロード時は同一BridgeDomain数を比較し変数へ代入する
#filename = "./test.csv"
#print(Bridge_Domain.index(Bridge_Domain[1]))
BUM_counter = [] #同一BridgeDomain数を算出した結果を入れる用
for i in range(counter):
loopcounter = 0 ##loopcouterの初期化
bum_count = 0 ## for文の中だけで一時的に使う
try:
with open(filename,encoding='utf8',newline='')as f: #./test.csvというファイルを移行"f"というオブジェクトに置き換えて利用
csvreader = csv.reader(f)
for row in csvreader:
if Bridge_Domain[i] == row[2]:
# print('Bridge_Domain[%d]と一致'%loopcounter)
bum_count = bum_count + 1
loopcounter =loopcounter + 1
BUM_counter.append(bum_count)
print('Bridge_Domain[%d]の一致数は%dです'%(i,BUM_counter[i]))
except FileNotFoundError as e:
print(e)
error('031') #例外処理へ
except csv.Error as e:
print(e)
error('032') #例外処理へ
BUM_counter[0]='SUM(same_Bridge_Domain)'#リストの先頭はカラム識別情報を入れているので、比較結果ではなく、説明文に入れ直す。
##BUM帯域設定ファイルを読み込む処理
BUM_Bridge_Domain = [] ##BUM設定ファイルのBridge_Domainの値格納用
BUM_Bandwidth_wse = [] ##WSEのBUMの帯域設定値を読み込む用
BUM_Bandwidth_wge = [] ##WGEのBUMの帯域設定値を読み込む用
BUM_filename = "../define_bum_bandwidth.csv"#BUMのStorm-control設定値定義ファイル。ファイル名に合わせて変更する。ディレクトリパスも含めて
#カレントディレクトリの状態はinputなので1つ上の階梯の設定ファイルを読み込む様にする。
BUM_define_counter =0 #設定ファイル行数をカウントしておく
try:
with open(BUM_filename,encoding='utf8',newline='')as BUMread: #./test.csvというファイルを移行"f"というオブジェクトに置き換えて利用
csvreader = csv.reader(BUMread)
for row in csvreader:
BUM_Bridge_Domain.append(row[0])#csvファイルの1カラム目をBUM_Bridge_Domainのリストへ追加
BUM_Bandwidth_wse.append(row[1])#csvファイルの2カラム目をBUM_Bandwidth_wseのリストへ追加
BUM_Bandwidth_wge.append(row[2])#csvファイルの3カラム目をBUM_Bandwidth_wseのリストへ追加
print('BUM_Bridge_Domain[%d]=%s'%(BUM_define_counter,BUM_Bridge_Domain[BUM_define_counter]))
print('BUM_Bandwidth_wse[%d]=%s'%(BUM_define_counter,BUM_Bandwidth_wse[BUM_define_counter]))
print('BUM_Bandwidth_wge[%d]=%s'%(BUM_define_counter,BUM_Bandwidth_wge[BUM_define_counter]))
BUM_define_counter=BUM_define_counter + 1
print('counterの値は:%d'%counter)
print('BUM_define_counterの値は:%d'%BUM_define_counter)
except FileNotFoundError as e:
print(e)
error('041') #例外処理へ
except csv.Error as e:
print(e)
error('042') #例外処理へ
##CSVファイルの書き込み処理
w_filename = "../"+str(d_today)+"result.csv" #出力先パスとファイル名の指定
try:
with open(w_filename,'w',encoding='utf8',newline="\n")as csvfile: #./result.csvというファイルを生成し"csvfile"というオブジェクトにお置き換えて利用
writer = csv.writer(csvfile,lineterminator='\n')
for i in range(counter): #csvファイル行数分書き込みをする
if i==0:#最初の1行目はカラムの説明情報
writer.writerow([host_name[i],Bridge_Domain[i],BUM_counter[i],'SUM(Total_Bandwidth(bps))'])
print('1行目の処理完了')
else:
for j in range(BUM_define_counter): #BUMの設定ファイルの行数分検索して処理する
if BUM_Bridge_Domain[j] == Bridge_Domain[i]:#BUMの設定値ファイルのBridgeDomainの値と読み込んだcsvファイルのBridge_Domainの値が一致するとき
if 'wge' in host_name[i] :#host_nameに'wge'の文字が含まれる時
BWidth1 = int(BUM_counter[i])
BWidth2 = int(BUM_Bandwidth_wge[j])
BWidth = BWidth1 * BWidth2 #ファイル書き込み用変数BWidthに値を代入
elif 'wse' in host_name[i] :#host_nameに'wse'の文字が含まれる時
BWidth1 = int(BUM_counter[i])
BWidth2 = int(BUM_Bandwidth_wse[j])
BWidth = BWidth1 * BWidth2 #ファイル書き込み用変数BWidthに値を代入
else:
print('エラー処理必要')
writer.writerow([host_name[i],Bridge_Domain[i],BUM_counter[i],BWidth]) #csvファイルへ書き込み
print('%d回目の処理完了'%i)
except FileNotFoundError as e:
print(e)
error('051') #例外処理へ
except csv.Error as e:
print(e)
error('052') #例外処理へ
##最後に読み込んだcsvファイルを読込済みディレクトリへ移動する
move_path = shutil.move(path+'/'+str(name_list[counter_d]),path+'/Already_read/'+str(name_list[counter_d]))
指定のディレクトリ内のCSVファイル名一覧の読み込み処理
このブロックでは以下の処理を行う。
- プログラムで扱うカレントディレクトリを指定して、その中にある任意の文字列のcsvファイル名を読み込む
- システム関数で本日の日付をロードする
- 本日の日付を含むcsvファイル名を検索する
import glob #globライブラリを使う場合にインポートする ファイルパスを取得する用途
import datetime #現在日時を扱うためのライブラリ 読み込むCSVファイルの選定に利用する
##inputディレクトリに入っているcsvファイル名を読み込み、本日の日付YYYY-MM-DDの文字列が含まれるCSVファイルを特定する
d_today = datetime.date.today() #今日の日付を取得する
if len(str(d_today)) == 0: #d_todayの中身が空/NULLの時
error('001') #例外処理へ
name_list =glob.glob('*.csv') #.csvファイルの一覧を取得してname_listへ格納
if len(name_list) == 0: #name_listの中身が空/NULLの時
error('002') #例外処理へ
name_list_counter=0
counter_d=0 #今日の日付が着いたCSVファイルのファイル名があるname_listの配列番号を入れる
jg_cnt = 0 #name_listの文字列にd_todayの文字列が1つも含まれない or 2つ以上ある場合の判定識別子
for f in name_list:#name_listの配列数分for分を回す
if str(d_today) in name_list[name_list_counter]:#今日の日付とname_listのファイル名の比較
counter_d=name_list_counter #一致した場合はcounter_dに配列番号を記憶
jg_cnt = jg_cnt + 1
name_list_counter = name_list_counter +1
print("jg_cntの値:%d" %jg_cnt)
if jg_cnt == 0: #name_listの文字列にd_todayの文字列が1つも含まれない場合
error('011') #例外処理へ
elif jg_cnt >= 2: #name_listの文字列にd_todayの文字列が2つ以上含まれる場合
error('012') #例外処理へ
- 1,2行目
ライブラリの利用宣言
ファイルパスを扱うためのライブラリ、およびシステムの現在日時を扱うためのライブラリ。
どちらもpythonの標準ライブラリ。
以降の処理で関数を扱う時に最初に宣言が必要なもの。 - 6〜8行目
本日の日付を読み込む
d_today の変数へ本日の日付をロードする。
形式は「YYYY-MM-DD」の形式で読み込む。
※変数の型は文字列string型で扱う場合は宣言が必要。
len関数にて文字列の長さを比較し、ロードがうまく行っていない場合にはerror関数へエラーコードを返す仕様とした。
error関数はプログラムの冒頭に宣言している。 - 10〜12行目
現在のカレントディレクトリ内にある拡張子「csv」のファイル名を全て読み込む
glob関数を使ってcsv拡張子のファイル名を全てname_list配列へ格納
len関数で配列が空の場合に例外処理error関数を実行。 - 14〜26行目
読み込んだCSVファイル名に“本日の日付を含む”ファイル名をサーチ
counter_d配列はname_listの配列番号を指定するための変数
jg_cnt変数はname_list配列のファイル名判定のエラー処理用の変数
18行目の判定構文 if 変数1 in 変数2
変数2に変数1を含む場合true,含まない場合false。
ここでYYYY-MM-DDの文字列を含むファイル名を見つけてtrueの場合に、
counter_dにname_listの配列番号を記憶させる。
同時に、jg_cntを+1する。
ファイル名の部分一致が1回しかでないのを基本として、
0回or2回以上出る場合には、読み込むべきcsvファイルの判別がつかないため、
error関数を実行してプログラムを停止させる。
CSVファイルの読み込み部分の説明
host_name =[] #CSVの1列目のHOSTNAMEを格納する用
Bridge_Domain = [] #CSVの3列目のBridge Domainを格納する用
##1回目のファイルロード時は必要なパラメータを変数へ代入する
filename = name_list[counter_d] #開くcsvファイル名を入れる
try:
with open(filename,encoding='utf8',newline='')as f: #./test.csvというファイルを移行"f"というオブジェクトに置き換えて利用
csvreader = csv.reader(f)
counter = 0 #csvファイルの読み出し行数をカウントしておくための変数
for row in csvreader:
# Bridge_Domain[counter][1] = row[1]
# Bridge_Domain[counter][2] = row[2]
# print('Bridge_domain[%d][1]= %s' %(counter,Bridge_Domain[counter][1]))
# print('Bridge_domain[%d][2]= %s' %(counter,Bridge_Domain[counter][2]))
# print('row[1]=%s'%row[1])
host_name.append(row[0])
Bridge_Domain.append(row[2])
print('host_name[%d]=%s'%(counter,host_name[counter]))
print('Bridge_Domain[%d]=%s'%(counter,Bridge_Domain[counter]))
counter = counter +1
# print(counter)
except FileNotFoundError as e:
print(e)
error('021') #例外処理へ
except csv.Error as e:
print(e)
error('022') #例外処理へ
- 1、2行目
CSVファイルのカラムをロードしてくるための配列宣言
pythonの場合は配列(リスト)数の指定をしなくてもOKで楽ちん。
初期化は 配列名 = [] でOK - 7行目〜8行目
CSVファイルのロード処理
pyhonの標準ライブラリを使うのでコード先頭にimport csv を宣言する必要あり。
※標準ではないpandasライブラリを使うやり方もあるが今回は標準のものを採用。
withブロックを使うことで、open()メソッドで開いたファイルをclose()メソッドで閉じることを自動でやってくれる。
open(第1引数,第2引数,台)メソッドwith open() as xxx:
のxxx
には任意の名前を使用できる。open()
でオープンしたファイルオブジェクトにxxx
という名前をつけてブロック内で使用するイメージ。f
が使われることが多いが、他の名前でも問題ない。
コンストラクタcsv.reader()
の第一引数にopen()
で開いたファイルオブジェクトを指定する。csv.reader
オブジェクトは行を反復処理するイテレータとみなせるため、for文で行ごとのデータを取得できる。
コメント