dcadfd4c76
- add getting a single incident, getting multiple (with pages), and creating an incident - blueprints.streaming: add incident related OP codes - consts: add IncidentType - manager: add incidents channel, add publish_incident - elstat: add snowflake.py
95 lines
2.6 KiB
Python
95 lines
2.6 KiB
Python
"""
|
|
snowflake.py - snowflake helper functions
|
|
|
|
These functions generate discord-like snowflakes.
|
|
File brought in from
|
|
litecord-reference(https://github.com/lnmds/litecord-reference)
|
|
"""
|
|
import time
|
|
import hashlib
|
|
import os
|
|
import base64
|
|
|
|
# encoded in ms
|
|
EPOCH = 1420070400000
|
|
|
|
# internal state
|
|
_generated_ids = 0
|
|
PROCESS_ID = 1
|
|
WORKER_ID = 1
|
|
|
|
Snowflake = int
|
|
|
|
|
|
def get_invite_code() -> str:
|
|
"""Get a random invite code."""
|
|
random_stuff = hashlib.sha512(os.urandom(1024)).digest()
|
|
code = base64.urlsafe_b64encode(random_stuff).decode().replace('=', '5') \
|
|
.replace('_', 'W').replace('-', 'm')
|
|
return code[:6]
|
|
|
|
|
|
def _snowflake(timestamp: int) -> Snowflake:
|
|
"""Get a snowflake from a specific timestamp
|
|
|
|
This function relies on modifying internal variables
|
|
to generate unique snowflakes. Because of that every call
|
|
to this function will generate a different snowflake,
|
|
even with the same timestamp.
|
|
|
|
Arguments
|
|
---------
|
|
timestamp: int
|
|
Timestamp to be feed in to the snowflake algorithm.
|
|
This timestamp has to be an UNIX timestamp
|
|
with millisecond precision.
|
|
"""
|
|
# Yes, using global variables aren't the best idea
|
|
# Maybe we could distribute the work of snowflake generation
|
|
# to actually separated servers? :thinking:
|
|
global _generated_ids
|
|
|
|
# bits 0-12 encode _generated_ids (size 12)
|
|
genid_b = '{0:012b}'.format(_generated_ids)
|
|
|
|
# bits 12-17 encode PROCESS_ID (size 5)
|
|
procid_b = '{0:05b}'.format(PROCESS_ID)
|
|
|
|
# bits 17-22 encode WORKER_ID (size 5)
|
|
workid_b = '{0:05b}'.format(WORKER_ID)
|
|
|
|
# bits 22-64 encode (timestamp - EPOCH) (size 42)
|
|
epochized = timestamp - EPOCH
|
|
epoch_b = '{0:042b}'.format(epochized)
|
|
|
|
snowflake_b = f'{epoch_b}{workid_b}{procid_b}{genid_b}'
|
|
_generated_ids += 1
|
|
|
|
return int(snowflake_b, 2)
|
|
|
|
|
|
def snowflake_time(snowflake: Snowflake) -> float:
|
|
"""Get the UNIX timestamp(with millisecond precision, as a float)
|
|
from a specific snowflake.
|
|
"""
|
|
|
|
# the total size for a snowflake is 64 bits,
|
|
# considering it is a string, position 0 to 42 will give us
|
|
# the `epochized` variable
|
|
snowflake_b = '{0:064b}'.format(snowflake)
|
|
epochized_b = snowflake_b[:42]
|
|
epochized = int(epochized_b, 2)
|
|
|
|
# since epochized is the time *since* the EPOCH
|
|
# the unix timestamp will be the time *plus* the EPOCH
|
|
timestamp = epochized + EPOCH
|
|
|
|
# convert it to seconds
|
|
# since we don't want to break the entire
|
|
# snowflake interface
|
|
return timestamp / 1000
|
|
|
|
|
|
def get_snowflake():
|
|
"""Generate a snowflake"""
|
|
return _snowflake(int(time.time() * 1000))
|