mirror of
https://git.wownero.com/lza_menace/neroswap.git
synced 2024-08-15 01:03:24 +00:00
159 lines
5.4 KiB
Python
159 lines
5.4 KiB
Python
from io import BytesIO
|
|
from base64 import b64encode
|
|
from qrcode import make as qrcode_make
|
|
from datetime import datetime
|
|
from flask import session, redirect, url_for, request, Blueprint, flash, render_template
|
|
from monero.address import address
|
|
from app.models import Swap
|
|
from app.factory import db
|
|
from app.routes.api import perform_conversion
|
|
from app.library.crypto import wow_wallet, xmr_wallet
|
|
from app.library.crypto import wownero, monero
|
|
from app.library.cache import cache
|
|
from app.library.mattermost import post_webhook
|
|
from app import config
|
|
|
|
|
|
bp = Blueprint('swap', 'swap')
|
|
|
|
@bp.route('/swap')
|
|
def create_swap():
|
|
send_amount = request.args.get('send_amount')
|
|
wow_to_xmr = request.args.get('wow_to_xmr', 'true')
|
|
wow_address = request.args.get('wow_address')
|
|
xmr_address = request.args.get('xmr_address')
|
|
|
|
# Check send_amount
|
|
try:
|
|
send_amount = float(send_amount)
|
|
except:
|
|
flash('Invalid send amount provided')
|
|
return redirect('/#swap')
|
|
|
|
# Check send_amount greater than 0
|
|
if not send_amount > 0:
|
|
flash('Invalid send amount provided')
|
|
return redirect('/#swap')
|
|
|
|
# todo - simplify if possible, normalize var conventions
|
|
if wow_to_xmr == 'true':
|
|
wow_amount = send_amount
|
|
xmr_amount = 0
|
|
worth = 'wow_worth'
|
|
wow_to_xmr = True
|
|
gen_subaddress = wow_wallet.new_address
|
|
receive_wallet = wownero
|
|
send_wallet = monero
|
|
fee_as_other = 'wow_to_xmr_fee_as_xmr'
|
|
amount_as_other = 'wow_as_xmr'
|
|
coin = 'WOW'
|
|
elif wow_to_xmr == 'false':
|
|
wow_amount = 0
|
|
xmr_amount = send_amount
|
|
worth = 'xmr_worth'
|
|
wow_to_xmr = False
|
|
gen_subaddress = xmr_wallet.new_address
|
|
receive_wallet = monero
|
|
send_wallet = wownero
|
|
fee_as_other = 'xmr_to_wow_fee_as_wow'
|
|
amount_as_other = 'xmr_as_wow'
|
|
coin = 'XMR'
|
|
else:
|
|
flash('There was a problem processing')
|
|
return redirect('/#swap')
|
|
|
|
# Check addresses
|
|
if not Swap().validate_wow_address(wow_address):
|
|
flash('Invalid WOW address provided.')
|
|
return redirect('/#swap')
|
|
|
|
if not Swap().validate_xmr_address(xmr_address):
|
|
flash('Invalid XMR address provided.')
|
|
return redirect('/#swap')
|
|
|
|
# Get price and conversion data
|
|
conversion = perform_conversion(wow_amount, xmr_amount)
|
|
|
|
# Check swap worth
|
|
sw = Swap().validate_swap_worth(conversion[worth])
|
|
if not sw[0]:
|
|
flash(f'Invalid swap worth: {sw[1]}')
|
|
return redirect('/#swap')
|
|
|
|
# Create new subaddress
|
|
swap_address = gen_subaddress()
|
|
|
|
# Validate subaddress does not exist already
|
|
checks = 0
|
|
address_exists = Swap.query.filter(
|
|
Swap.wow_to_xmr==wow_to_xmr,
|
|
Swap.swap_address_index==swap_address[0]
|
|
).first()
|
|
while address_exists:
|
|
print(f'Found existing address index {swap_address[0]} for {coin}. Generating another.')
|
|
if checks >= 10:
|
|
flash('Something went wrong. Contact the administrator for help.')
|
|
return redirect('/#swap')
|
|
swap_address = gen_subaddress()
|
|
address_exists = Swap.query.filter(
|
|
Swap.wow_to_xmr==wow_to_xmr,
|
|
Swap.swap_address_index==swap_address[0]
|
|
).first()
|
|
checks += 1
|
|
|
|
|
|
# Create swap in database
|
|
send_full_atomic = send_wallet.to_atomic(conversion[amount_as_other])
|
|
send_fee_atomic = send_wallet.to_atomic(conversion[fee_as_other])
|
|
s = Swap(
|
|
date=datetime.utcnow(),
|
|
wow_to_xmr=wow_to_xmr,
|
|
wow_price_ausd=Swap().to_ausd(conversion['wow_price']),
|
|
xmr_price_ausd=Swap().to_ausd(conversion['xmr_price']),
|
|
swap_address=swap_address[1],
|
|
swap_address_index=swap_address[0],
|
|
return_wow_address=wow_address,
|
|
return_xmr_address=xmr_address,
|
|
receive_amount_atomic=receive_wallet.to_atomic(send_amount),
|
|
fee_amount_atomic=send_wallet.to_atomic(conversion[fee_as_other]),
|
|
send_amount_atomic=send_full_atomic - send_fee_atomic
|
|
)
|
|
db.session.add(s)
|
|
db.session.commit()
|
|
url = url_for('swap.get_swap', swap_id=s.id, _external=True)
|
|
details = s.show_details()
|
|
post_webhook(f'New swap [{s.id}]({url}); swap wallets to receive {details["receive"]} and send {details["send"]} ({details["fee"]} fee)')
|
|
return redirect(url)
|
|
|
|
# todo - simplify, use model logic
|
|
@bp.route('/swap/<swap_id>')
|
|
def get_swap(swap_id):
|
|
s = Swap.query.get(swap_id)
|
|
_address_qr = BytesIO()
|
|
if s:
|
|
if s.hours_elapsed(since_completed=True) > 12:
|
|
flash('That swap is completed and no longer available to view')
|
|
return redirect('/')
|
|
if s.wow_to_xmr:
|
|
coin = 'wow'
|
|
full = 'wownero'
|
|
qr_uri = s.swap_address
|
|
else:
|
|
coin = 'xmr'
|
|
full = 'monero'
|
|
qr_uri = f'{full}:{s.swap_address}&tx_description=neroswap_{s.id}'
|
|
address_qr = qrcode_make(qr_uri).save(_address_qr)
|
|
qrcode = b64encode(_address_qr.getvalue()).decode()
|
|
txes = cache.get_transfer_data(coin, s.swap_address_index)
|
|
return render_template('swap.html', swap=s, txes=txes, qrcode=qrcode)
|
|
else:
|
|
flash('That swap ID does not exist.')
|
|
return redirect('/#search')
|
|
|
|
@bp.route('/search')
|
|
def search_swap():
|
|
swap_id = request.args.get('swap_id')
|
|
if not swap_id:
|
|
flash('No swap ID provided')
|
|
return redirect('/#search')
|
|
return redirect(url_for('swap.get_swap', swap_id=swap_id))
|