244 lines
5.8 KiB
Elixir
244 lines
5.8 KiB
Elixir
defmodule Elstat.Error.Service do
|
|
defexception message: "default message"
|
|
end
|
|
|
|
defmodule Elstat.ServiceWorker do
|
|
require Logger
|
|
|
|
def start_link(service) do
|
|
Logger.info "spawning worker for #{inspect service.id}"
|
|
|
|
worker_pid = spawn(fn ->
|
|
worker_func(service)
|
|
end)
|
|
|
|
{:ok, worker_pid}
|
|
end
|
|
|
|
def worker_func(service) do
|
|
adapter = service.adapter
|
|
|
|
result = adapter.check(service.adapter_opts)
|
|
Logger.info "result from #{inspect adapter} #{inspect service.id}: #{inspect result}"
|
|
|
|
man_pid = :global.whereis_name Elstat.Manager
|
|
send man_pid, {:service_info, {service.id, result}}
|
|
|
|
Process.sleep(service.poll * 1000)
|
|
worker_func(service)
|
|
end
|
|
end
|
|
|
|
defmodule Elstat.Manager do
|
|
@moduledoc """
|
|
Elstat's service manager.
|
|
"""
|
|
use GenServer
|
|
require Logger
|
|
|
|
def start_link(opts) do
|
|
GenServer.start_link(__MODULE__, :ok, [name: {:global, Elstat.Manager}] ++ opts)
|
|
end
|
|
|
|
def build_services() do
|
|
services = Application.fetch_env!(:elstat, :services)
|
|
|
|
workers = services
|
|
|> Map.keys
|
|
|> Enum.map(fn service_id ->
|
|
service = Map.put(services[service_id], :id, service_id)
|
|
|
|
# each service worker will enter an infinite loop
|
|
# polling the service (via an adapter) and giving
|
|
# information back to the manager, so it can
|
|
# process and show those via its API.
|
|
%{
|
|
id: service_id,
|
|
start: {Elstat.ServiceWorker, :start_link, [service]}
|
|
}
|
|
end)
|
|
|
|
# spawn the managere alongside service workers
|
|
[ Elstat.Manager | workers]
|
|
end
|
|
|
|
def get_current_state() do
|
|
man_pid = :global.whereis_name Elstat.Manager
|
|
GenServer.call(man_pid, {:get_current})
|
|
end
|
|
|
|
def get_graph_state() do
|
|
man_pid = :global.whereis_name Elstat.Manager
|
|
GenServer.call(man_pid, {:get_graph})
|
|
end
|
|
|
|
# server callbacks
|
|
|
|
def get_columns_def do
|
|
%{
|
|
timestamp: "timestamp bigint",
|
|
status: "status bool",
|
|
latency: "latency bigint",
|
|
}
|
|
end
|
|
|
|
def get_columns_name do
|
|
%{
|
|
timestamp: "timestamp",
|
|
status: "status",
|
|
latency: "latency",
|
|
}
|
|
end
|
|
|
|
def init(:ok) do
|
|
services = Application.fetch_env!(:elstat, :services)
|
|
|
|
Sqlitex.with_db('elstat.db', fn(db) ->
|
|
services
|
|
|> Map.keys
|
|
|> Enum.each(fn service_id ->
|
|
service = services[service_id]
|
|
adapter = service.adapter
|
|
|
|
spec = adapter.adapter_spec
|
|
|
|
columns = get_columns_def()
|
|
|
|
column_res = spec.db_columns
|
|
|> Enum.map(fn column ->
|
|
columns[column]
|
|
end)
|
|
|> Enum.join(",\n")
|
|
|
|
query = """
|
|
CREATE TABLE IF NOT EXISTS #{Atom.to_string service_id} (
|
|
#{column_res}
|
|
);
|
|
"""
|
|
|
|
Logger.debug "query for #{inspect service_id}: #{query}"
|
|
|
|
case Sqlitex.query(db, query) do
|
|
{:ok, _} ->
|
|
Logger.info "created table for #{inspect service_id}"
|
|
{:error, _err} ->
|
|
Logger.error "error making table for #{inspect service_id}"
|
|
end
|
|
end)
|
|
end)
|
|
|
|
{:ok, %{
|
|
services: services,
|
|
serv_state: %{},
|
|
}}
|
|
end
|
|
|
|
def handle_call({:get_current}, _from, state) do
|
|
reply = state.serv_state
|
|
|> Map.keys
|
|
|> Enum.map(fn key ->
|
|
data = state.serv_state[key]
|
|
desc = state.services[key].description
|
|
|
|
case data do
|
|
{:map, data_map} ->
|
|
{key, Map.put(data_map, :description, desc)}
|
|
{:bool, data_bool} ->
|
|
{key, %{
|
|
status: data_bool,
|
|
description: desc,
|
|
}}
|
|
end
|
|
end)
|
|
|> Map.new
|
|
|
|
{:reply, reply, state}
|
|
end
|
|
|
|
def handle_call({:get_graph}, _from, state) do
|
|
graph_reply = state.serv_state
|
|
|> Map.keys
|
|
|> Enum.map(fn key ->
|
|
spec = state.services[key].adapter.adapter_spec
|
|
|
|
if Enum.member?(spec.db_columns, :latency) do
|
|
{:ok, result} = Sqlitex.with_db('elstat.db', fn db ->
|
|
query = """
|
|
SELECT timestamp, latency FROM #{Atom.to_string key}
|
|
ORDER BY timestamp DESC
|
|
LIMIT 50
|
|
"""
|
|
|
|
Logger.debug "query for latency: #{query}"
|
|
|
|
Sqlitex.query(db, query)
|
|
end)
|
|
|
|
act = result
|
|
|> Enum.map(fn field ->
|
|
[Keyword.get(field, :timestamp), Keyword.get(field, :latency)]
|
|
end)
|
|
|
|
{key, act}
|
|
else
|
|
nil
|
|
end
|
|
end)
|
|
|> Enum.filter(fn d -> d != nil end)
|
|
|> Map.new
|
|
|
|
Logger.debug "graph reply: #{inspect graph_reply}"
|
|
|
|
{:reply, graph_reply, state}
|
|
end
|
|
|
|
def build_insert_query(service_id, state) do
|
|
services = state.services
|
|
service = services[service_id]
|
|
spec = service.adapter.adapter_spec
|
|
|
|
columns_str = spec.db_columns
|
|
|> Enum.map(fn atom ->
|
|
Atom.to_string atom
|
|
end)
|
|
|> Enum.join(",")
|
|
|
|
query_args_str = 1..Enum.count(spec.db_columns)
|
|
|> Enum.map(fn num ->
|
|
"$#{num}"
|
|
end)
|
|
|> Enum.join(",")
|
|
|
|
"""
|
|
INSERT INTO #{Atom.to_string service_id} (#{columns_str})
|
|
VALUES (#{query_args_str})
|
|
"""
|
|
end
|
|
|
|
def handle_info({:service_info, {sid, sdata}}, state) do
|
|
case sdata do
|
|
{:ok, actual_data} ->
|
|
new_serv_state = Map.put(state.serv_state, sid, actual_data)
|
|
|
|
query = build_insert_query(sid, state)
|
|
|
|
adapter = state.services[sid].adapter
|
|
adp_args = adapter.transform_val(actual_data)
|
|
|
|
timestamp = :erlang.system_time(:millisecond)
|
|
|
|
Logger.debug "query: #{query}, timestamp: #{timestamp}, adp args: #{inspect adp_args}"
|
|
|
|
Sqlitex.with_db('elstat.db', fn(db) ->
|
|
res = Sqlitex.query(db, query, bind: [timestamp | adp_args])
|
|
IO.puts "#{inspect res}"
|
|
end)
|
|
|
|
{:noreply, %{state | serv_state: new_serv_state}}
|
|
{:error, err} ->
|
|
Logger.warn "error on #{inspect sid}: #{inspect err}"
|
|
end
|
|
|
|
end
|
|
|
|
end
|