#!/usr/bin/env ruby

require File.expand_path('../lib/archive_bot/redis', __FILE__)
require File.expand_path('../lib/archive_bot/log_aggregator', __FILE__)
require File.expand_path('../lib/archive_bot/zmq_utils', __FILE__)

require 'yajl'
require 'ffi-rzmq'

##
# Wraps a ZeroMQ socket with operations expected by ArchiveBot::LogAggregator.
class ZeroMQIO
  def initialize(socket)
    @socket = socket
  end

  def puts(string)
    ret = @socket.send_string(string)
    raise 'unable to publish log message' unless zmq_ok?(ret)
  end

  def flush
  end
end

include ArchiveBot::Redis
include ArchiveBot::ZmqUtils

# ---------------------------------------------------------------------------
# PUBLISH SOCKET
# ---------------------------------------------------------------------------

if !ENV['FIREHOSE_SOCKET_URL']
  abort 'FIREHOSE_SOCKET_URL is not set'
end

context = ZMQ::Context.create(1)
raise 'unable to create ZeroMQ context' unless context

socket = context.socket(ZMQ::PUB)
zmq_error_check(socket.setsockopt(ZMQ::LINGER, 1))

if !zmq_ok?(socket.bind(ENV['FIREHOSE_SOCKET_URL']))
  raise "unable to bind to #{ENV['FIREHOSE_SOCKET_URL']}"
end

cleanup = ->(*){
  zmq_error_check(socket.close)
  context.terminate
  exit 0
}

trap('TERM', &cleanup)
trap('INT', &cleanup)

agg = ArchiveBot::LogAggregator.new(ZeroMQIO.new(socket))

# ---------------------------------------------------------------------------
# JSON PARSER / REDIS CONNECTION
# ---------------------------------------------------------------------------

r = make_redis

p = Yajl::Parser.new
p.on_parse_complete = agg.method(:output)

def unbroadcasted_log_entries(job, r)
  log_key = job['log_key']
  start = job['last_broadcasted_log_entry']

  r.zrangebyscore(log_key, "(#{start}", '+inf', with_scores: true)
end

# ---------------------------------------------------------------------------
# LOOP
# ---------------------------------------------------------------------------

$stdin.each_line do |line|
  job_key = line.chomp
  job = r.hgetall(job_key)

  agg.job = job
  agg.ident = job_key

  log_entries = unbroadcasted_log_entries(job, r)
  next if log_entries.empty?

  begin
    log_entries.each do |entry, _|
      begin
        p << entry
      rescue Yajl::ParseError #TODO: Why is this necessary?
        puts job_key
        next
      end
    end
  rescue Errno::EPIPE
    break
  end

  r.hmset(job_key,
          'last_broadcasted_log_entry', log_entries.last.last,
          'ts', Time.now.to_f)
end
