# -*- coding: utf-8 -*-
import time
from .instance import AbstractBlockchainInstanceProvider
class Blockchain(AbstractBlockchainInstanceProvider):
""" This class allows to access the blockchain and read data
from it
:param instance blockchain_instance: instance to use when accesing a RPC
:param str mode: (default) Irreversible block (``irreversible``) or
actual head block (``head``)
:param int max_block_wait_repetition: (default) 3 maximum wait time for
next block ismax_block_wait_repetition * block_interval
This class let's you deal with blockchain related data and methods.
"""
def __init__(
self,
blockchain_instance=None,
mode="irreversible",
max_block_wait_repetition=None,
*args,
**kwargs
):
self.define_classes()
assert self.block_class
assert self.operationids
self._parameters = None
if mode == "irreversible":
self.mode = "last_irreversible_block_num"
elif mode == "head":
self.mode = "head_block_number"
else:
raise ValueError("invalid value for 'mode'!")
if max_block_wait_repetition:
self.max_block_wait_repetition = max_block_wait_repetition
else:
self.max_block_wait_repetition = 3
[docs] def is_irreversible_mode(self):
return self.mode == "last_irreversible_block_num"
[docs] def info(self):
""" This call returns the *dynamic global properties*
"""
return self.blockchain.rpc.get_dynamic_global_properties()
[docs] def chainParameters(self):
""" The blockchain parameters, such as fees, and committee-controlled
parameters are returned here
"""
if not self._parameters:
self.update_chain_parameters()
return self._parameters
[docs] def update_chain_parameters(self):
self._parameters = self.config()["parameters"]
[docs] def get_network(self):
""" Identify the network
:returns: Network parameters
:rtype: dict
"""
return self.blockchain.rpc.get_network()
[docs] def get_chain_properties(self):
""" Return chain properties
"""
return self.blockchain.rpc.get_chain_properties()
[docs] def config(self):
""" Returns object 2.0.0
"""
return self.blockchain.rpc.get_object("2.0.0")
[docs] def get_current_block_num(self):
""" This call returns the current block
.. note:: The block number returned depends on the ``mode`` used
when instanciating from this class.
"""
return self.info().get(self.mode)
[docs] def get_current_block(self):
""" This call returns the current block
.. note:: The block number returned depends on the ``mode`` used
when instanciating from this class.
"""
return self.block_class(
self.get_current_block_num(), blockchain_instance=self.blockchain
)
[docs] def get_block_interval(self):
""" This call returns the block interval
"""
return self.chainParameters().get("block_interval")
[docs] def block_time(self, block_num):
""" Returns a datetime of the block with the given block
number.
:param int block_num: Block number
"""
return self.block_class(block_num, blockchain_instance=self.blockchain).time()
[docs] def block_timestamp(self, block_num):
""" Returns the timestamp of the block with the given block
number.
:param int block_num: Block number
"""
return int(
self.block_class(block_num, blockchain_instance=self.blockchain)
.time()
.timestamp()
)
[docs] def blocks(self, start=None, stop=None):
""" Yields blocks starting from ``start``.
:param int start: Starting block
:param int stop: Stop at this block
:param str mode: We here have the choice between
"head" (the last block) and "irreversible" (the block that is
confirmed by 2/3 of all block producers and is thus irreversible)
"""
# Let's find out how often blocks are generated!
self.block_interval = self.get_block_interval()
if not start:
start = self.get_current_block_num()
# We are going to loop indefinitely
while True:
# Get chain properies to identify the
if stop:
head_block = stop
else:
head_block = self.get_current_block_num()
# Blocks from start until head block
for blocknum in range(start, head_block + 1):
# Get full block
block = self.wait_for_and_get_block(blocknum)
block.update({"block_num": blocknum})
yield block
# Set new start
start = head_block + 1
if stop and start > stop:
return
# Sleep for one block
time.sleep(self.block_interval)
[docs] def wait_for_and_get_block(self, block_number, blocks_waiting_for=None):
""" Get the desired block from the chain, if the current head block is
smaller (for both head and irreversible) then we wait, but a
maxmimum of blocks_waiting_for * max_block_wait_repetition time
before failure.
:param int block_number: desired block number
:param int blocks_waiting_for: (default) difference between
block_number and current head how many blocks we are willing to
wait, positive int
"""
if not blocks_waiting_for:
blocks_waiting_for = max(1, block_number - self.get_current_block_num())
repetition = 0
# can't return the block before the chain has reached it (support
# future block_num)
while self.get_current_block_num() < block_number:
repetition += 1
time.sleep(self.block_interval)
if repetition > blocks_waiting_for * self.max_block_wait_repetition:
raise Exception("Wait time for new block exceeded, aborting")
# block has to be returned properly
block = self.blockchain.rpc.get_block(block_number)
repetition = 0
while not block:
repetition += 1
time.sleep(self.block_interval)
if repetition > self.max_block_wait_repetition:
raise Exception("Wait time for new block exceeded, aborting")
block = self.blockchain.rpc.get_block(block_number)
return block
[docs] def ops(self, start=None, stop=None, **kwargs):
""" Yields all operations (excluding virtual operations) starting from
``start``.
:param int start: Starting block
:param int stop: Stop at this block
:param str mode: We here have the choice between
"head" (the last block) and "irreversible" (the block that is
confirmed by 2/3 of all block producers and is thus irreversible)
:param bool only_virtual_ops: Only yield virtual operations
This call returns a list that only carries one operation and
its type!
"""
for block in self.blocks(start=start, stop=stop, **kwargs):
for tx in block["transactions"]:
for op in tx["operations"]:
# Replace opid by op name
op[0] = self.operationids.getOperationName(op[0])
yield {
"block_num": block["block_num"],
"op": op,
"timestamp": block["timestamp"],
}
[docs] def stream(self, opNames=[], *args, **kwargs):
""" Yield specific operations (e.g. comments) only
:param array opNames: List of operations to filter for
:param int start: Start at this block
:param int stop: Stop at this block
:param str mode: We here have the choice between
* "head": the last block
* "irreversible": the block that is confirmed by 2/3 of all
block producers and is thus irreversible!
The dict output is formated such that ``type`` caries the
operation type, timestamp and block_num are taken from the
block the operation was stored in and the other key depend
on the actualy operation.
"""
for op in self.ops(**kwargs):
if not opNames or op["op"][0] in opNames:
r = {
"type": op["op"][0],
"timestamp": op.get("timestamp"),
"block_num": op.get("block_num"),
}
r.update(op["op"][1])
yield r
[docs] def awaitTxConfirmation(self, transaction, limit=10):
""" Returns the transaction as seen by the blockchain after being
included into a block
.. note:: If you want instant confirmation, you need to instantiate
class:`.blockchain.Blockchain` with
``mode="head"``, otherwise, the call will wait until
confirmed in an irreversible block.
.. note:: This method returns once the blockchain has included a
transaction with the **same signature**. Even though the
signature is not usually used to identify a transaction,
it still cannot be forfeited and is derived from the
transaction contented and thus identifies a transaction
uniquely.
"""
counter = 0
for block in self.blocks():
counter += 1
for tx in block["transactions"]:
if sorted(tx["signatures"]) == sorted(transaction["signatures"]):
return tx
if counter > limit:
raise Exception("The operation has not been added after 10 blocks!")
[docs] def get_all_accounts(self, start="", stop="", steps=1e3, **kwargs):
""" Yields account names between start and stop.
:param str start: Start at this account name
:param str stop: Stop at this account name
:param int steps: Obtain ``steps`` ret with a single call from RPC
"""
lastname = start
while True:
ret = self.blockchain.rpc.lookup_accounts(lastname, steps)
for account in ret:
yield account[0]
if account[0] == stop:
return
if lastname == ret[-1][0]:
return
lastname = ret[-1][0]
if len(ret) < steps:
return
@property
def participation_rate(self):
return bin(int(self.info()["recent_slots_filled"])).count("1") / 128