Add address

This commit is contained in:
Michał Sałaban 2017-11-24 03:05:16 +01:00
commit bbeb7d85a2
6 changed files with 307 additions and 0 deletions

6
.gitignore vendored Normal file
View file

@ -0,0 +1,6 @@
.*
!.gitignore
*.py[co]
*~
*.bak
*.swp

1
monero/__init__.py Normal file
View file

@ -0,0 +1 @@
from .address import Address

69
monero/address.py Normal file
View file

@ -0,0 +1,69 @@
from binascii import hexlify, unhexlify
from sha3 import keccak_256
from . import base58
class Address(object):
def __init__(self, address):
if len(address) != 95:
raise ValueError("Address must be 95 characters long, is %d" % len(address))
self._decode(address)
def _decode(self, address):
self._decoded = unhexlify(base58.decode(address))
checksum = self._decoded[-4:]
if checksum != keccak_256(self._decoded[:-4]).digest()[:4]:
raise ValueError("Invalid checksum")
def is_testnet(self):
return self._decoded[0] in bytes([53, 54])
def get_view_key(self):
return hexlify(self._decoded[33:65]).decode()
def get_spend_key(self):
return hexlify(self._decoded[1:33]).decode()
def with_payment_id(self, payment_id=0):
if isinstance(payment_id, (bytes, str)):
payment_id = int(payment_id, 16)
elif not isinstance(payment_id, int):
raise TypeError("payment_id must be either int or hexadecimal str or bytes")
prefix = 54 if self.is_testnet() else 19
data = bytes([prefix]) + self._decoded[1:65] + payment_id.to_bytes(8, byteorder='big')
checksum = keccak_256(data).digest()[:4]
return IntegratedAddress(base58.encode(hexlify(data + checksum)))
def __repr__(self):
return base58.encode(hexlify(self._decoded))
def __eq__(self, other):
if isinstance(other, Address):
return str(self) == str(other)
if isinstance(other, str):
return str(self) == other
return super()
class IntegratedAddress(Address):
def __init__(self, address):
if len(address) != 106:
raise ValueError("Integrated address must be 106 characters long, is %d" % len(address))
self._decode(address)
def get_payment_id(self):
return hexlify(self._decoded[65:-4]).decode()
def get_base_address(self):
prefix = 53 if self.is_testnet() else 18
data = bytes([prefix]) + self._decoded[1:65]
checksum = keccak_256(data).digest()[:4]
return Address(base58.encode(hexlify(data + checksum)))
def address(addr):
if len(addr) == 95:
return Address(addr)
elif len(addr) == 106:
return IntegratedAddress(addr)
raise ValueError("Address must be either 95 or 106 characters long")

168
monero/base58.py Normal file
View file

@ -0,0 +1,168 @@
# MoneroPy - A python toolbox for Monero
# Copyright (C) 2016 The MoneroPy Developers.
#
# MoneroPy is released under the BSD 3-Clause license. Use and redistribution of
# this software is subject to the license terms in the LICENSE file found in the
# top-level directory of this distribution.
__alphabet = [ord(s) for s in '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz']
__b58base = 58
__UINT64MAX = 2**64
__encodedBlockSizes = [0, 2, 3, 5, 6, 7, 9, 10, 11]
__fullBlockSize = 8
__fullEncodedBlockSize = 11
def _hexToBin(hex):
if len(hex) % 2 != 0:
return "Hex string has invalid length!"
return [int(hex[i*2:i*2+2], 16) for i in range(len(hex)//2)]
def _binToHex(bin):
return "".join([("0" + hex(int(bin[i])).split('x')[1])[-2:] for i in range(len(bin))])
def _strToBin(a):
return [ord(s) for s in a]
def _binToStr(bin):
return ''.join([chr(bin[i]) for i in range(len(bin))])
def _uint8be_to_64(data):
l_data = len(data)
if l_data < 1 or l_data > 8:
return "Invalid input length"
res = 0
switch = 9 - l_data
for i in range(l_data):
if switch == 1:
res = res << 8 | data[i]
elif switch == 2:
res = res << 8 | data[i]
elif switch == 3:
res = res << 8 | data[i]
elif switch == 4:
res = res << 8 | data[i]
elif switch == 5:
res = res << 8 | data[i]
elif switch == 6:
res = res << 8 | data[i]
elif switch == 7:
res = res << 8 | data[i]
elif switch == 8:
res = res << 8 | data[i]
else:
return "Impossible condition"
return res
def _uint64_to_8be(num, size):
res = [0] * size;
if size < 1 or size > 8:
return "Invalid input length"
twopow8 = 2**8
for i in range(size-1,-1,-1):
res[i] = num % twopow8
num = num // twopow8
return res
def encode_block(data, buf, index):
l_data = len(data)
if l_data < 1 or l_data > __fullEncodedBlockSize:
return "Invalid block length: " + str(l_data)
num = _uint8be_to_64(data)
i = __encodedBlockSizes[l_data] - 1
while num > 0:
remainder = num % __b58base
num = num // __b58base
buf[index+i] = __alphabet[remainder];
i -= 1
return buf
def encode(hex):
'''Encode hexadecimal string as base58 (ex: encoding a Monero address).'''
data = _hexToBin(hex)
l_data = len(data)
if l_data == 0:
return ""
full_block_count = l_data // __fullBlockSize
last_block_size = l_data % __fullBlockSize
res_size = full_block_count * __fullEncodedBlockSize + __encodedBlockSizes[last_block_size]
res = [0] * res_size
for i in range(res_size):
res[i] = __alphabet[0]
for i in range(full_block_count):
res = encode_block(data[(i*__fullBlockSize):(i*__fullBlockSize+__fullBlockSize)], res, i * __fullEncodedBlockSize)
if last_block_size > 0:
res = encode_block(data[(full_block_count*__fullBlockSize):(full_block_count*__fullBlockSize+last_block_size)], res, full_block_count * __fullEncodedBlockSize)
return _binToStr(res)
def decode_block(data, buf, index):
l_data = len(data)
if l_data < 1 or l_data > __fullEncodedBlockSize:
return "Invalid block length: " + l_data
res_size = __encodedBlockSizes.index(l_data)
if res_size <= 0:
return "Invalid block size"
res_num = 0
order = 1
for i in range(l_data-1, -1, -1):
digit = __alphabet.index(data[i])
if digit < 0:
return "Invalid symbol"
product = order * digit + res_num
if product > __UINT64MAX:
return "Overflow"
res_num = product
order = order * __b58base
if res_size < __fullBlockSize and 2**(8 * res_size) <= res_num:
return "Overflow 2"
tmp_buf = _uint64_to_8be(res_num, res_size)
for i in range(len(tmp_buf)):
buf[i+index] = tmp_buf[i]
return buf
def decode(enc):
'''Decode a base58 string (ex: a Monero address) into hexidecimal form.'''
enc = _strToBin(enc)
l_enc = len(enc)
if l_enc == 0:
return ""
full_block_count = l_enc // __fullEncodedBlockSize
last_block_size = l_enc % __fullEncodedBlockSize
last_block_decoded_size = __encodedBlockSizes.index(last_block_size)
if last_block_decoded_size < 0:
return "Invalid encoded length"
data_size = full_block_count * __fullBlockSize + last_block_decoded_size
data = [0] * data_size
for i in range(full_block_count):
data = decode_block(enc[(i*__fullEncodedBlockSize):(i*__fullEncodedBlockSize+__fullEncodedBlockSize)], data, i * __fullBlockSize)
if last_block_size > 0:
data = decode_block(enc[(full_block_count*__fullEncodedBlockSize):(full_block_count*__fullEncodedBlockSize+last_block_size)], data, full_block_count * __fullBlockSize)
return _binToHex(data)

1
requirements.txt Normal file
View file

@ -0,0 +1 @@
pysha3

62
test.py Normal file
View file

@ -0,0 +1,62 @@
import unittest
from monero.address import Address, IntegratedAddress, address
class Tests(object):
def test_from_and_to_string(self):
a = Address(self.addr)
self.assertEqual(str(a), self.addr)
self.assertEqual(a.get_spend_key(), self.psk)
self.assertEqual(a.get_view_key(), self.pvk)
ia = IntegratedAddress(self.iaddr)
self.assertEqual(ia.get_payment_id(), self.pid)
self.assertEqual(str(ia), self.iaddr)
self.assertEqual(ia.get_spend_key(), self.psk)
self.assertEqual(ia.get_view_key(), self.pvk)
self.assertEqual(ia.get_base_address(), a)
def test_payment_id(self):
a = Address(self.addr)
ia = a.with_payment_id(self.pid)
self.assertIsInstance(ia, IntegratedAddress)
self.assertEqual(ia.get_payment_id(), self.pid)
self.assertEqual(str(ia), self.iaddr)
def test_recognition_and_comparisons(self):
a = Address(self.addr)
a2 = address(self.addr)
self.assertIsInstance(a2, Address)
self.assertEqual(a, a2)
self.assertEqual(a, self.addr)
self.assertEqual(self.addr, a)
self.assertEqual(a.is_testnet(), self.testnet)
self.assertEqual(a2.is_testnet(), self.testnet)
ia = IntegratedAddress(self.iaddr)
ia2 = address(self.iaddr)
self.assertIsInstance(ia, IntegratedAddress)
self.assertEqual(ia, ia2)
self.assertEqual(ia, self.iaddr)
self.assertEqual(self.iaddr, ia)
self.assertEqual(ia.is_testnet(), self.testnet)
self.assertEqual(ia2.is_testnet(), self.testnet)
self.assertEqual(ia2.get_base_address(), a)
class AddressTestCase(unittest.TestCase, Tests):
addr = '43aeKax1ts4BoEbSyzKVbbDRmc8nsnpZLUpQBYvhUxs3KVrodnaFaBEQMDp69u4VaiEG3LSQXA6M61mXPrztCLuh7PFUAmd'
psk = '33a7ceb933b793408d49e82c0a34664a4be7117243cb77a64ef280b866d8aa6e'
pvk = '96f70d63d9d3558b97a5dd200a170b4f45b3177a274aa90496ea683896ff6438'
pid = '4a6f686e47616c74'
iaddr = '4DHKLPmWW8aBoEbSyzKVbbDRmc8nsnpZLUpQBYvhUxs3KVrodnaFaBEQMDp69u4VaiEG3LSQXA6M61mXPrztCLuhAR6GpL18QNwE8h3TuF'
testnet = False
class TestnetAddressTestCase(AddressTestCase, Tests):
addr = '9u9j6xG1GNu4ghrdUL35m5PQcJV69YF8731DSTDoh7pDgkBWz2LWNzncq7M5s1ARjPRhvGPX4dBUeC3xNj4wzfrjV6SY3e9'
psk = '345b201b8d1ba216074e3c45ca606c85f68563f60d0b8c0bfab5123f80692aed'
pvk = '9deb70cc7e1e23d635de2d5a3086a293b4580dc2b9133b4211bc09f22fadc4f9'
pid = '4a6f686e47616c74'
iaddr = 'A4rQ7m5VseR4ghrdUL35m5PQcJV69YF8731DSTDoh7pDgkBWz2LWNzncq7M5s1ARjPRhvGPX4dBUeC3xNj4wzfrjihS6W83Km1mE7W3kMa'
testnet = True