diff --git a/app/management/commands/desatascar_pendientes.py b/app/management/commands/desatascar_pendientes.py index a414cea8e6ea9d6854164f964a14731c74de07b3..afb6ddfed1dc10137d936f7fc1f658669ad7ee4d 100644 --- a/app/management/commands/desatascar_pendientes.py +++ b/app/management/commands/desatascar_pendientes.py @@ -5,11 +5,11 @@ from django.core.management import BaseCommand from app.managers import EthereumGateway from app.services import TransactionUnstucker from TsaApi.settings import GAS_PRICE +import logging class Command(BaseCommand): - unstucker = TransactionUnstucker() - logger = logging.getLogger('cmd') + unstucker = TransactionUnstucker(logging.getLogger('console-logger')) def handle(self, *args, **options): self.unstucker.unstuck_pending_transactions(GAS_PRICE) diff --git a/app/managers.py b/app/managers.py index 22116cbef105baf4cd262bea1eb4b019fb0488b2..569c9850f214e76056cb75e75eef42c0e6bee89b 100644 --- a/app/managers.py +++ b/app/managers.py @@ -94,6 +94,10 @@ class TimestampManager(models.Manager): return web3.eth.getBlock('latest').number +class TxAlreadySealedInBlock(Exception): + pass + + class EthereumGateway: def transaction_is_canonical(self, transaction): last_block_number = TimestampManager.get_last_block_number() @@ -106,13 +110,20 @@ class EthereumGateway: def pending_transactions(self): w3 = TimestampManager.get_provider() - return w3.eth.pendingTransactions + filt = w3.eth.filter('pending') + tx_hashes = w3.eth.getFilterChanges(filt.filter_id) + # ojo con lo de abajo, es costoso en tiempo, hace len(tx_hashes) conexiones al nodo, si es sobre HTTP es lento + txs = [w3.eth.getTransaction(i.hex()) for i in tx_hashes] + return txs def block(self, block_number): return TimestampManager.get_block(block_number) def resend(self, tx, gas_price=None, gas=None): w3 = TimestampManager.get_provider() + tx_2 = w3.eth.getTransaction(tx['hash']) + if tx_2['blockNumber'] is not None: + raise TxAlreadySealedInBlock("Tx already seald in block {}".format(str(tx_2['blockNumber']))) return w3.eth.resend(tx, gas_price, gas) diff --git a/app/services.py b/app/services.py index 56c518fdabfac70d3e4c01fb90bcdf3358cf73c3..ef5c01ea48ca6e445e44d4984580bc671aa9f63f 100644 --- a/app/services.py +++ b/app/services.py @@ -4,10 +4,17 @@ from json import JSONDecodeError from app.managers import TimestampManager, EthereumGateway, TimeStamp from app.utils import Utils, Base64EncodingService from pymemcache.client import base -from TsaApi.settings import MEMCACHED_HOST,MEMCACHED_PORT, PENDING_TXS_MEMCACHED_KEY +from TsaApi.settings import MEMCACHED_HOST, MEMCACHED_PORT, PENDING_TXS_MEMCACHED_KEY import logging import json +from sys import stdout +if stdout.isatty(): + log_str = 'console-logger' +else: + log_str = 'logger' + +module_logger = logging.getLogger(log_str) class DefinitiveReceiptGenerator: encoder = Base64EncodingService() @@ -97,21 +104,26 @@ class VerifyService: class MemcachedStorage: - content = '' client = base.Client((MEMCACHED_HOST, MEMCACHED_PORT)) + server_tuple = (MEMCACHED_HOST, MEMCACHED_PORT) key = PENDING_TXS_MEMCACHED_KEY - logger = logging.getLogger('logger') + logger = module_logger def store(self, pending_txs): jsoned = json.dumps(pending_txs) self.client.set(self.key, jsoned) def retrieve(self): - retrieved = self.client.get(self.key).decode("utf-8") + retrieved = self.client.get(self.key) try: - interpretado = json.loads(retrieved) + decoded = retrieved.decode("utf-8") + interpretado = json.loads(decoded) + except AttributeError as ae: + self.logger.info("Memcached estaba vacÃo key: {} ; valor : {}".format(str(self.key), retrieved)) + interpretado = [] except JSONDecodeError as jsonde: - self.logger.error("Se devolvera lista vacia. No se pudo decodear la string: {} {}".format(retrieved, str(jsonde))) + self.logger.error( + "Se devolvera lista vacia. No se pudo decodear la string: {} {}".format(retrieved, str(jsonde))) interpretado = [] return interpretado @@ -119,9 +131,12 @@ class MemcachedStorage: class CheckCriteria: storage = MemcachedStorage() + def __init__(self, logger=None): + if logger is not None: + self.logger = logger + def should_transactions_be_resent(self, stuck_txs): stored_txs = self.storage.retrieve() - # return all(elem in txs for elem in stored_txs) and len(txs) >= len(stored_txs) txs_dict = {} for stx in stored_txs: txs_dict[stx['hash']] = stx @@ -136,14 +151,18 @@ class CheckCriteria: class PendingTransactionsService: gateway = EthereumGateway() criteria = CheckCriteria() - logger = logging.getLogger('cmd_logger') + logger = module_logger - def resend_stuck_transactions(self, stuck_tx, new_gas_price, new_gas_limit=None): + def __init__(self, logger=None): + if logger is not None: + self.logger = logger + + def resend_transactions(self, txs, new_gas_price, new_gas_limit=None): self.logger.info("Por evaluar si deberÃan ser reenviadas las transacciones") new_txs = [] failed = [] - if self.criteria.should_transactions_be_resent(stuck_tx): # obs: si es vacÃo entonces evalúa True - for tx in stuck_tx: # si es vacÃo este ciclo no hace nada + if self.criteria.should_transactions_be_resent(txs): # obs: si es vacÃo entonces evalúa True + for tx in txs: # si es vacÃo este ciclo no hace nada self.logger.info("Reenviando transaccion {}".format(str(tx))) try: new_txs.append(self.gateway.resend(tx, new_gas_price, new_gas_limit)) @@ -158,10 +177,22 @@ class TransactionUnstucker: pending_transaction_service = PendingTransactionsService() gateway = EthereumGateway() storage = MemcachedStorage() - logger = logging.getLogger('cmd_logger') + logger = module_logger + + def __init__(self, logger=None): + if logger is not None: + self.logger = logger + self.pending_transaction_service.logger = logger def unstuck_pending_transactions(self, new_gas_price, new_gas=None): + self.logger.info('Arranca proceso') stuck_txs = self.gateway.pending_transactions() - res, failed = self.pending_transaction_service.resend_stuck_transactions(stuck_txs, new_gas_price, new_gas) - self.storage.store(self.gateway.pending_transactions()) + self.logger.debug('Cant txs encontradas : {}'.format(len(stuck_txs))) + res, failed = self.pending_transaction_service.resend_transactions(stuck_txs, new_gas_price, new_gas) + self.logger.debug( + 'Terminado, cant reenvios con exito: {} ; cant reenvios fallados: {}'.format(len(res), len(failed))) + nuevas_txs = self.gateway.pending_transactions() + self.logger.debug('Cant txs encontradas : {}'.format(len(nuevas_txs))) + self.storage.store(nuevas_txs) + self.logger.info('Termina proceso') return res, failed diff --git a/app/tests.py b/app/tests.py index ef6c33de3061a8508fe9e9a4e243bc40b152d4d7..0e7b9b345fe260c3f65baad790d4e8979132d8f9 100644 --- a/app/tests.py +++ b/app/tests.py @@ -12,6 +12,7 @@ from django.test import Client from eth_account.datastructures import AttributeDict from pymemcache.client import base from rest_framework import status +from TsaApi.settings import TEST_MEMCACHED_HOST, TEST_MEMCACHED_PORT, TEST_PENDING_TXS_MEMCACHED_KEY from app.services import PendingTransactionsService, CheckCriteria, MemcachedStorage, TransactionUnstucker @@ -220,7 +221,7 @@ class PendingTransactionTest(TestCase, EthereumMocks): self.service.criteria = self.CriteriaMockTrue() self.service.gateway = self.EthereumMock() mock_txs = self.current_txs - result, failed = self.service.resend_stuck_transactions(mock_txs, 1000, ) + result, failed = self.service.resend_transactions(mock_txs, 1000, ) self.assertEqual(result, mock_txs) self.assertEqual(failed, []) @@ -228,7 +229,7 @@ class PendingTransactionTest(TestCase, EthereumMocks): self.service.criteria = self.CriteriaMockFalse() self.service.gateway = self.EthereumMock() mock_txs = self.current_txs - result, failed = self.service.resend_stuck_transactions(mock_txs, 1000, ) + result, failed = self.service.resend_transactions(mock_txs, 1000, ) self.assertEqual(result, []) self.assertEqual(failed, []) @@ -238,7 +239,7 @@ class PendingTransactionTest(TestCase, EthereumMocks): self.service.criteria = criteria self.service.gateway = self.EthereumMock() txs = self.current_txs - result, failed = self.service.resend_stuck_transactions(txs, 1000, ) + result, failed = self.service.resend_transactions(txs, 1000, ) self.assertEqual(result, txs) self.assertEqual(failed, []) @@ -248,7 +249,7 @@ class PendingTransactionTest(TestCase, EthereumMocks): self.service.criteria = criteria self.service.gateway = self.FailResender() txs = self.current_txs - result, failed = self.service.resend_stuck_transactions(txs, 1000, ) + result, failed = self.service.resend_transactions(txs, 1000, ) self.assertEqual(result, []) self.assertEqual(failed, txs) @@ -258,7 +259,7 @@ class PendingTransactionTest(TestCase, EthereumMocks): self.service.criteria = criteria self.service.gateway = self.HalfResender() txs = self.current_txs - result, failed = self.service.resend_stuck_transactions(txs, 1000, ) + result, failed = self.service.resend_transactions(txs, 1000, ) self.assertEqual(result, txs[:2]) self.assertEqual(failed, [txs[-1]]) @@ -272,6 +273,9 @@ class MemcachedStorageTest(TestCase): class MemcachedMock: dictionary = {} + def __init__(self, some=None): # necesario para el monkey patching que hice en un test + pass + def set(self, key, value, expire_time=0): self.dictionary[key] = str(value).encode() return value @@ -281,18 +285,21 @@ class MemcachedStorageTest(TestCase): def test_mock_memcached(self): old_client = self.storage.client + old_mem_client = base.Client # perdon por el monkey patching :c self.storage.client = self.MemcachedMock() - txs = [123, 345, 78, 9] - self.storage.store(txs) + base.Client = self.MemcachedMock + some_stuff = [123, 345, 78, 9] + self.storage.store(some_stuff) retrieved = self.storage.retrieve() - self.assertEqual(txs, retrieved) + self.assertEqual(some_stuff, retrieved) self.storage.client = old_client + base.Client = old_mem_client def test_integrated_memcached(self): - m_client = base.Client(('172.17.0.1', 11211)) - m_client.set('tst_pending_txs', []) + m_client = base.Client((TEST_MEMCACHED_HOST, TEST_MEMCACHED_PORT)) + m_client.set(TEST_PENDING_TXS_MEMCACHED_KEY, []) self.storage.client = m_client - self.storage.key = 'tst_pending_txs' + self.storage.key = TEST_PENDING_TXS_MEMCACHED_KEY some_stuff = [1, 2, 3, 4, 54, 5, 765] self.storage.store(some_stuff) retrieved = self.storage.retrieve() @@ -320,14 +327,15 @@ class TransactionUnstuckerTest(TestCase, EthereumMocks): # testeo contra transacciones cacheadas que siguen pendientes, def test_unstuck_integrated_memcached_custom_client(self): - m_client = base.Client(('172.17.0.1', 11211)) # me hago mi propio cliente - m_client.set('tst_pending_txs', []) # limpio por las dudas sobre una key propia que no sea la de lso settings + m_client = base.Client((TEST_MEMCACHED_HOST, TEST_MEMCACHED_PORT)) # me hago mi propio cliente + m_client.set(TEST_PENDING_TXS_MEMCACHED_KEY, []) # limpio por las dudas sobre otra key mock = self.EthereumMock() # mockeo el gateway de ethereum service = PendingTransactionsService() service.criteria.storage.client = m_client # setteo mi cliente - service.criteria.storage.key = 'tst_pending_txs' + service.criteria.storage.key = TEST_PENDING_TXS_MEMCACHED_KEY service.criteria.storage.store(mock.pending_transactions()) # y le asigno las pendientes mockeadas - service.gateway = mock # asigno los mocks que quedan + + service.gateway = mock # asigno los mocks que armé self.unstucker.gateway = mock self.unstucker.pending_transaction_service = service self.unstucker.storage = service.criteria.storage @@ -339,8 +347,8 @@ class TransactionUnstuckerTest(TestCase, EthereumMocks): # testeo contra nada de transacciones cacheadas, y transacciones pendientes en el momento def test_unstuck_integrated_memcached_custom_client_nothing_to_do(self): - m_client = base.Client(('172.17.0.1', 11211)) - m_client.set('tst_pending_txs', []) + m_client = base.Client((TEST_MEMCACHED_HOST, TEST_MEMCACHED_PORT)) + m_client.set(TEST_PENDING_TXS_MEMCACHED_KEY, []) mock = self.EthereumMock() service = PendingTransactionsService() service.criteria.storage.client = m_client