defmodule Elstat.Error.Service do defexception message: "default message" end defmodule Elstat.ServiceWorker do require Logger def start_link(service) do Logger.info "spawning worker for #{inspect service.id}" worker_pid = spawn(fn -> worker_func(service) end) {:ok, worker_pid} end def worker_func(service) do adapter = service.adapter result = adapter.check(service.adapter_opts) Logger.info "result from #{inspect adapter} #{inspect service.id}: #{inspect result}" man_pid = :global.whereis_name Elstat.Manager send man_pid, {:service_info, {service.id, result}} Process.sleep(service.poll * 1000) worker_func(service) end end defmodule Elstat.Manager do @moduledoc """ Elstat's service manager. """ use GenServer require Logger def start_link(opts) do GenServer.start_link(__MODULE__, :ok, [name: {:global, Elstat.Manager}] ++ opts) end def build_services() do services = Application.fetch_env!(:elstat, :services) workers = services |> Map.keys |> Enum.map(fn service_id -> service = Map.put(services[service_id], :id, service_id) # each service worker will enter an infinite loop # polling the service (via an adapter) and giving # information back to the manager, so it can # process and show those via its API. %{ id: service_id, start: {Elstat.ServiceWorker, :start_link, [service]} } end) # spawn the managere alongside service workers [ Elstat.Manager | workers] end def get_current_state() do man_pid = :global.whereis_name Elstat.Manager GenServer.call(man_pid, {:get_current}) end def get_graph_state() do man_pid = :global.whereis_name Elstat.Manager GenServer.call(man_pid, {:get_graph}) end # server callbacks def get_columns_def do %{ timestamp: "timestamp bigint", status: "status bool", latency: "latency bigint", } end def get_columns_name do %{ timestamp: "timestamp", status: "status", latency: "latency", } end def init(:ok) do services = Application.fetch_env!(:elstat, :services) Sqlitex.with_db('elstat.db', fn(db) -> services |> Map.keys |> Enum.each(fn service_id -> service = services[service_id] adapter = service.adapter spec = adapter.adapter_spec columns = get_columns_def() column_res = spec.db_columns |> Enum.map(fn column -> columns[column] end) |> Enum.join(",\n") query = """ CREATE TABLE IF NOT EXISTS #{Atom.to_string service_id} ( #{column_res} ); """ Logger.debug "query for #{inspect service_id}: #{query}" case Sqlitex.query(db, query) do {:ok, _} -> Logger.info "created table for #{inspect service_id}" {:error, _err} -> Logger.error "error making table for #{inspect service_id}" end end) end) {:ok, %{ services: services, serv_state: %{}, }} end def handle_call({:get_current}, _from, state) do reply = state.serv_state |> Map.keys |> Enum.map(fn key -> data = state.serv_state[key] desc = state.services[key].description case data do {:map, data_map} -> {key, Map.put(data_map, :description, desc)} {:bool, data_bool} -> {key, %{ status: data_bool, description: desc, }} end end) |> Map.new {:reply, reply, state} end def handle_call({:get_graph}, _from, state) do graph_reply = state.serv_state |> Map.keys |> Enum.map(fn key -> spec = state.services[key].adapter.adapter_spec if Enum.member?(spec.db_columns, :latency) do {:ok, result} = Sqlitex.with_db('elstat.db', fn db -> query = """ SELECT timestamp, latency FROM #{Atom.to_string key} ORDER BY timestamp DESC LIMIT 50 """ Logger.debug "query for latency: #{query}" Sqlitex.query(db, query) end) act = result |> Enum.map(fn field -> [Keyword.get(field, :timestamp), Keyword.get(field, :latency)] end) {key, act} else nil end end) |> Enum.filter(fn d -> d != nil end) |> Map.new Logger.debug "graph reply: #{inspect graph_reply}" {:reply, graph_reply, state} end def build_insert_query(service_id, state) do services = state.services service = services[service_id] spec = service.adapter.adapter_spec columns_str = spec.db_columns |> Enum.map(fn atom -> Atom.to_string atom end) |> Enum.join(",") query_args_str = 1..Enum.count(spec.db_columns) |> Enum.map(fn num -> "$#{num}" end) |> Enum.join(",") """ INSERT INTO #{Atom.to_string service_id} (#{columns_str}) VALUES (#{query_args_str}) """ end def handle_info({:service_info, {sid, sdata}}, state) do case sdata do {:ok, actual_data} -> new_serv_state = Map.put(state.serv_state, sid, actual_data) query = build_insert_query(sid, state) adapter = state.services[sid].adapter adp_args = adapter.transform_val(actual_data) timestamp = :erlang.system_time(:millisecond) Logger.debug "query: #{query}, timestamp: #{timestamp}, adp args: #{inspect adp_args}" Sqlitex.with_db('elstat.db', fn(db) -> res = Sqlitex.query(db, query, bind: [timestamp | adp_args]) IO.puts "#{inspect res}" end) {:noreply, %{state | serv_state: new_serv_state}} {:error, err} -> Logger.warn "error on #{inspect sid}: #{inspect err}" end end end