Fluentdのプラグインを作成してみる(練習用)
前回までのあらすじ
Fluentdというログ収集ツールを使ってApacheのログを取得するまで - kk_Atakaの日記 で、confファイルのコメントアウトを外してとりあえず動くっていうところまではいけた。今度は簡単なプラグインを作ってみる。
ひたすら何かを吐き続けるプラグイン
Inputプラグインの場合
Fluent::Inputクラスを継承する。
class SimpleInput < Fluent::Input # 第一引数がプラグインの名前、<source> typeに指定される Fluent::Plugin.register_input("simple_in", self) def start @thread = Thread.new(&method(:run)) end def run loop do # emitメソッドの第一引数が <match **.**> の**に該当すればその形式で出力される # fluent.confでdebug.**はstdoutで出力すると定義されている Fluent::Engine.emit("debug.debug", Fluent::Engine.now, {"simple" => "debudebu"}) # fluent.confに<match simple.output> type simple_outを定義したので、以下のOutput形式で出力される Fluent::Engine.emit("simple.output", Fluent::Engine.now, {"simple" => "simout"}) sleep(1) end end end
上のソースには書かれていないけど、他にもメソッドがいくつか。(公式サイトより)
- configure メソッド
- スタート前に呼び出される
- confハッシュにパラメータを入れられる?
- エラーはFluent::ConfigErrorを投げる
- start メソッド
- スタート時に呼ばれる
- ここでスレッドを作ったりファイルをオープンしたり
- shutdown メソッド
- シャットダウン時に呼ばれる
- スレッドやファイルのクローズはここで。
- イベントをsubmitするためにはFluent::Engine.emit(tag, time, record)メソッドを使う
- tagはString、timeはUnixTime、recordはハッシュ。
Outputプラグインの場合
まず種類がいろいろ。
- Buffered output plugins Fluent::BufferedOutputクラスを継承
- Time sliced output plugins Fluent::TimeSlicedOutputクラスを継承/Buffered output pluginを継承したプラグイン?
- Non-buffered output plugins Fluent::Outputクラスを継承
今回はFluent::Outputを使った。
class SimpleOutput < Fluent::Output # 第一引数がプラグインの名前、<match> の typeに指定される Fluent::Plugin.register_output("simple_out", self) def emit(tag, es, chain) chain.next es.each do |time, record| # 出力内容 puts "simple_out: #{time} - #{record}" end end end
- configure, start, shutdown はInputプラグイン同様にある
- emit メソッドはイベントに到達した場合呼ばれる
- esはFluent::EventStreamオブジェクト(イベントが入ってる?)
- eachで回すとイベントを検索できる。
- chainはトランザクションメッセージのオブジェクト。
- 適切なポイントでchain.nextを呼ぶとエラーを吐いたときにrollbackしてくれる。
- esはFluent::EventStreamオブジェクト(イベントが入ってる?)
Configファイル
# Inputプラグイン指定 <source> type simple_in </source> # Fluent::Engine.emit("simple.output", Fluent::Engine.now, {"simple" => "simout"}) # simple.outputタグ指定した方はsimple_output(Outputプラグイン)出力にマッチする <match simple.output> type simple_out </match> # ## match tag=debug.** and dump to console # Fluent::Engine.emit("debug.debug", Fluent::Engine.now, {"simple" => "debudebu"}) # debug.debugタグ指定した方はstdout出力にマッチする <match debug.**> type stdout </match>
これを実行すると…。
$ bundle exec fluentd -c /home/kk_Ataka/fluentd/fluent.conf -p /home/kk_Ataka/fluentd/plugin/ 2012-08-17 22:19:41 +0900: starting fluentd-0.10.24 2012-08-17 22:19:41 +0900: reading config file path="/home/kk_Ataka/fluentd/fluent.conf" (略) 2012-08-17 22:19:41 +0900: following tail of /usr/local/apache2/logs/access_log 2012-08-17 22:19:41 +0900: following tail of /usr/local/apache2/logs/error_log 2012-08-17 22:19:41 +0900 debug.debug: {"simple":"debudebu"} simple_out: 1345209581 - {"simple"=>"simout"} 2012-08-17 22:19:42 +0900 debug.debug: {"simple":"debudebu"} simple_out: 1345209582 - {"simple"=>"simout"} 2012-08-17 22:19:43 +0900 debug.debug: {"simple":"debudebu"} simple_out: 1345209583 - {"simple"=>"simout"} 2012-08-17 22:19:44 +0900 debug.debug: {"simple":"debudebu"} simple_out: 1345209584 - {"simple"=>"simout"} 2012-08-17 22:19:45 +0900 debug.debug: {"simple":"debudebu"} simple_out: 1345209585 - {"simple"=>"simout"} 2012-08-17 22:19:46 +0900 debug.debug: {"simple":"debudebu"} simple_out: 1345209586 - {"simple"=>"simout"}
1秒刻みでプラグインに記述した内容が出力される。こっちが標準のstdout出力。
2012-08-17 22:19:46 +0900 debug.debug: {"simple":"debudebu"}
こっちが自分で記述したOutputプラグイン書式の出力。
simple_out: 1345209586 - {"simple"=>"simout"}