class Fluent::SecureForwardOutput::Node

Attributes

authentication[RW]
detach[RW]
expire[R]
first_session[RW]
host[RW]
hostlabel[RW]
keepalive[RW]
password[RW]
port[RW]
shared_key[RW]
shared_key_salt[RW]
socket[RW]
sslsession[RW]
standby[RW]
state[RW]
unpacker[RW]
username[RW]

Public Class Methods

new(sender, conf) click to toggle source
# File lib/fluent/plugin/output_node.rb, line 21
def initialize(sender, conf)
  @sender = sender
  @shared_key = conf.shared_key || sender.shared_key

  @host = conf.host
  @port = conf.port
  @hostlabel = conf.hostlabel || conf.host
  @username = conf.username
  @password = conf.password
  @standby = conf.standby

  @proxy_uri = conf.proxy_uri

  @keepalive = sender.keepalive
  @connection_hard_timeout = sender.connection_hard_timeout

  @authentication = nil

  @writing = false

  @expire = nil
  @first_session = false
  @detach = false

  @socket = nil
  @sslsession = nil
  @unpacker = MessagePack::Unpacker.new

  @shared_key_salt = generate_salt
  @state = :helo
  @mtime = Time.now
  @thread = nil
end

Public Instance Methods

check_helo(message) click to toggle source
# File lib/fluent/plugin/output_node.rb, line 132
def check_helo(message)
  log.debug "checking helo"
  # ['HELO', options(hash)]
  unless message.size == 2 && message[0] == 'HELO'
    return false
  end
  opts = message[1]
  @shared_key_nonce = opts['nonce'] || '' # make shared_key_check failed (instead of error) if protocol version mismatch exist
  @authentication = opts['auth']
  @allow_keepalive = opts['keepalive']
  @mtime = Time.now
  true
end
check_pong(message) click to toggle source
# File lib/fluent/plugin/output_node.rb, line 162
def check_pong(message)
  log.debug "checking pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed',
  #  self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)]
  unless message.size == 5 && message[0] == 'PONG'
    return false, 'invalid format for PONG message'
  end
  _pong, auth_result, reason, hostname, shared_key_hexdigest = message

  unless auth_result
    return false, 'authentication failed: ' + reason
  end

  if hostname == @sender.self_hostname
    return false, 'same hostname between input and output: invalid configuration'
  end

  clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key_nonce).update(@shared_key).hexdigest
  unless shared_key_hexdigest == clientside
    return false, 'shared key mismatch'
  end

  @mtime = Time.now
  return true, nil
end
connect() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 226
def connect
  Thread.current.abort_on_exception = true
  log.debug "starting client"

  begin
    addr = @sender.hostname_resolver.getaddress(@host)
    log.debug "create tcp socket to node", host: @host, address: addr, port: @port
  rescue => e
    log.warn "failed to resolve the hostname", error_class: e.class, error: e, host: @host
    @state = :failed
    return
  end

  begin
    if @proxy_uri.nil? then
      sock = TCPSocket.new(addr, @port)
    else
      proxy = Proxifier::Proxy(@proxy_uri)
      sock = proxy.open(addr, @port)
    end
  rescue => e
    log.warn "failed to connect for secure-forward", error_class: e.class, error: e, host: @host, address: addr, port: @port
    @state = :failed
    return
  end

  log.trace "changing socket options"
  opt = [1, @sender.send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

  opt = [@sender.send_timeout.to_i, 0].pack('L!L!')  # struct timeval
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  log.trace "initializing SSL contexts"

  context = OpenSSL::SSL::SSLContext.new(@sender.ssl_version)

  log.trace "setting SSL verification options"

  if @sender.secure
    # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS
    # https://bugs.ruby-lang.org/issues/9424
    context.set_params({})

    if @sender.ssl_ciphers
      context.ciphers = @sender.ssl_ciphers
    else
      ### follow httpclient configuration by nahi
      # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH"
      context.ciphers = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default
    end

    log.trace "set verify_mode VERIFY_PEER"
    context.verify_mode = OpenSSL::SSL::VERIFY_PEER
    if @sender.enable_strict_verification
      context.cert_store = OpenSSL::X509::Store.new
      begin
        context.cert_store.set_default_paths
      rescue OpenSSL::X509::StoreError => e
        log.warn "faild to load system default certificates", error: e
      end
    end
    if @sender.ca_cert_path
      log.trace "set to use private CA", path: @sender.ca_cert_path
      context.ca_file = @sender.ca_cert_path
    end
  end

  log.debug "trying to connect ssl session", host: @host, address: addr, port: @port
  begin
    sslsession = OpenSSL::SSL::SSLSocket.new(sock, context)
    log.trace "connecting...", host: @host, address: addr, port: @port
    sslsession.connect
    @mtime = Time.now
  rescue => e
    log.warn "failed to establish SSL connection", error_class: e.class, error: e, host: @host, address: addr, port: @port
    @state = :failed
    return
  end

  log.debug "ssl session connected", host: @host, port: @port

  begin
    if @sender.enable_strict_verification
      log.debug "checking peer's certificate", subject: sslsession.peer_cert.subject
      sslsession.post_connection_check(@hostlabel)
      verify = sslsession.verify_result
      if verify != OpenSSL::X509::V_OK
        err_name = Fluent::SecureForwardOutput::OpenSSLUtil.verify_result_name(verify)
        log.warn "BUG: failed to verify certification while connecting host #{@host} as #{@hostlabel} (but not raised, why?)"
        log.warn "BUG: verify_result: #{err_name}"
        raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{@host} as #{@hostlabel}"
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    log.warn "failed to verify certification while connecting ssl session", host: @host, hostlabel: @hostlabel
    self.shutdown
    raise
  end

  log.debug "ssl session connected", host: @host, port: @port
  @socket = sock
  @sslsession = sslsession

  buf = ''
  read_length = @sender.read_length
  read_interval = @sender.read_interval
  socket_interval = @sender.socket_interval

  @mtime = Time.now

  loop do
    break if @detach
    break if @connection_hard_timeout && Time.now > @mtime + @connection_hard_timeout

    begin
      while @sslsession.read_nonblock(read_length, buf)
        if buf == ''
          sleep read_interval
          next
        end
        @unpacker.feed_each(buf, &method(:on_read))
        @mtime = Time.now
        buf = ''
      end
    rescue OpenSSL::SSL::SSLError => e
      # to wait i/o restart
      log.trace "SSLError", error_class: e.class, error: e, mtime: @mtime, host: @host, port: @port
      if @connection_hard_timeout && Time.now > @mtime + @connection_hard_timeout
        log.warn "connection hard timeout", mtime: @mtime, timeout: @connection_hard_timeout, host: @host, port: @port
        log.warn "aborting connection", host: @host, port: @port
        self.release!
        self.detach!
        break
      else
        sleep socket_interval
      end
    rescue SystemCallError => e
      log.warn "disconnected by Error", error_class: e.class, error: e, host: @host, port: @port
      self.release!
      self.detach!
      break
    rescue EOFError
      log.warn "disconnected", host: @host, port: @port
      self.release!
      self.detach!
      break
    end
  end
  while @writing
    break if @detach

    sleep read_interval
  end
  self.shutdown
end
detach!() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 71
def detach!
  @detach = true
end
detached?() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 75
def detached?
  @detach
end
dup() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 59
def dup
  renewed = self.class.new(
    @sender,
    Fluent::Config::Section.new({host: @host, port: @port, hostlabel: @hostlabel, username: @username, password: @password, shared_key: @shared_key, standby: @standby, proxy_uri: @proxy_uri})
  )
  renewed
end
established?() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 116
def established?
  @state == :established
end
expired?() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 120
def expired?
  if @keepalive.nil? || @keepalive == 0
    false
  else
    @expire && @expire < Time.now
  end
end
generate_ping() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 146
def generate_ping
  log.debug "generating ping"
  # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + nonce + shared_key),
  #  username || '', sha512\_hex(auth\_salt + username + password) || '']
  shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt).update(@sender.self_hostname).update(@shared_key_nonce).update(@shared_key).hexdigest
  ping = ['PING', @sender.self_hostname, @shared_key_salt, shared_key_hexdigest]
  if @authentication != ''
    password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest
    ping.push(@username, password_hexdigest)
  else
    ping.push('','')
  end
  @mtime = Time.now
  ping
end
generate_salt() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 128
def generate_salt
  OpenSSL::Random.random_bytes(16)
end
join() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 112
def join
  @thread && @thread.join
end
log() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 55
def log
  @sender.log
end
on_read(data) click to toggle source
# File lib/fluent/plugin/output_node.rb, line 193
def on_read(data)
  log.debug "on_read"
  if self.established?
    #TODO: ACK
    log.warn "unknown packets arrived..."
    return
  end

  case @state
  when :helo
    unless check_helo(data)
      log.warn "received invalid helo message from #{@host}"
      self.shutdown
      return
    end
    send_data generate_ping()
    @mtime = Time.now
    @state = :pingpong
  when :pingpong
    success, reason = check_pong(data)
    unless success
      log.warn "connection refused to #{@host}:" + reason
      self.shutdown
      return
    end
    log.info "connection established to #{@host}" if @first_session
    @state = :established
    @expire = Time.now + @keepalive if @keepalive && @keepalive > 0
    @mtime = Time.now
    log.debug "connection established", host: @host, port: @port, expire: @expire
  end
end
release!() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 88
def release!
  @writing = false
end
send_data(data) click to toggle source
# File lib/fluent/plugin/output_node.rb, line 188
def send_data(data)
  @mtime = Time.now
  @sslsession.write data.to_msgpack
end
shutdown() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 92
def shutdown
  log.debug "shutting down node #{@host}"
  @state = :closed

  if @thread == Thread.current
    @sslsession.close if @sslsession
    @socket.close if @socket
    @thread.kill
  else
    if @thread
      @thread.kill
      @thread.join
    end
    @sslsession.close if @sslsession
    @socket.close if @socket
  end
rescue => e
  log.debug "error on node shutdown #{e.class}:#{e.message}"
end
start() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 67
def start
  @thread = Thread.new(&method(:connect))
end
tain!() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 79
def tain!
  raise RuntimeError, "BUG: taining detached node" if @detach
  @writing = true
end
tained?() click to toggle source
# File lib/fluent/plugin/output_node.rb, line 84
def tained?
  @writing
end