パスワードを忘れた? アカウント作成
6492810 journal
日記

ninestarsの日記: [AWS][python]第4回 botoから S3 を操作しよう(応用編)

日記 by ninestars

前回までのS3への読み書きは、もちろん他のツールで実現できるので、それほど boto ライブラリを使用する意義はありませんね。では他のツールでは見かけたことが無い、rsync のような帯域制限を行ってみましょう。
大量のログなどを自身のネットワークからからS3へ転送する際、転送そのものの帯域占有が問題になることがあります。ログ転送の帯域を絞り、他への影響を軽減します。
余談ですが、東京リージョンのサービスが始まる前の2011年3月以前、シンガポールやUSへの転送は、そのままでは遅かったため並列数を上げるなど工夫をしていたものです。東京リージョンの開設によって解決したのですが、最近とある件でS3への転送帯域を制限したいとの要望があり、たった1年半で隔世の感を味わうことになったのでした。

さて、前回書き込みに用いた set_contents_from_filename() には、callback関数を与えることができるので、今回はこれを用います。

callback関係で使用する引数は以下の二つです。
    - cb : callback関数名 (default: None)
    - num_cb : callback回数 (default: 10)

また、今回は利用しませんがcallback関数への引数は以下の二つです。
    - (転送済みバイト数, ファイルバイト数)

帯域制御の方法についてはいくつか方法がありますが、ここでは単純に
・ファイルサイズを秒間転送量で案分し、callback回数を算出
・1秒以内に指定バイトを送信したら残り秒数を待つ
・指定バイトの送信に1秒以上使用した場合は待たずに次の指定バイト数を送信する
という戦略を取ることにします。

1秒経過をどう取得するかですが、単純に考えて前回の終了時間を保存し、一定バイト数転送後の時間と比較し1秒未満なら残時間を time.sleep() で待つなどが考えられます。しかしそれはスマートでないのでパス、今回はタイマースレッドとEventを組み合わせて実装します。

以下、ローカルの foo_bar.txt ファイルを s3://foobar/abc/foo_bar.txt へ 200kb/sec で転送するサンプルコードです。

# coding: utf-8

import os
import sys
import time
from threading import Thread, Event

import boto

def wait_cb(dummy1, dummy2):
        ev_wait.wait()
        ev_wait.clear()

def wait_th_run(sleep_sec=1.0):
        while not ev_terminate.isSet():
                time.sleep(sleep_sec)
                ev_wait.set()

if __name__ == '__main__':
        ev_wait = Event() # 転送制御用イベント
        ev_terminate = Event() # 転送制御用スレッド終了用イベント
        wait_th = Thread(target=wait_th_run) # 転送制御用スレッド作成
        wait_th.start()

        fname = './foo_bar.txt'
        filesize = os.stat(fname).st_size # ファイルサイズの取得
        cb_count = int(filesize / (200 * 1024)) # callback回数算出

        conn = boto.connect_s3('accceskey', 'secretkey')
        bucket = conn.get_bucket('foobar')
        s3key = boto.s3.key.Key(bucket)
        s3key.key = 'abc/foo_bar.txt'
        s3key.set_contents_from_filename(fname, cb=wait_cb, num_cb=cb_count)

        ev_terminate.set() # 転送制御用スレッド終了イベントset
        wait_th.join() # 転送制御用スレッド終了を待つ

Event オブジェクトによってどのような挙動となるか、ソースを追ってみてください。

次回はElastic MapReduceを実行する方法について紹介します。

この議論は賞味期限が切れたので、アーカイブ化されています。 新たにコメントを付けることはできません。
typodupeerror

UNIXはただ死んだだけでなく、本当にひどい臭いを放ち始めている -- あるソフトウェアエンジニア

読み込み中...