|
| 1 | +from collections import Counter |
| 2 | +from random import random |
| 3 | +from typing import Dict, List, Tuple |
| 4 | + |
| 5 | + |
| 6 | +class MarkovChainGraphUndirectedUnweighted: |
| 7 | + ''' |
| 8 | + Undirected Unweighted Graph for running Markov Chain Algorithm |
| 9 | + ''' |
| 10 | + |
| 11 | + def __init__(self): |
| 12 | + self.connections = {} |
| 13 | + |
| 14 | + def add_node(self, node: str) -> None: |
| 15 | + self.connections[node] = {} |
| 16 | + |
| 17 | + def add_transition_probability(self, node1: str, |
| 18 | + node2: str, |
| 19 | + probability: float) -> None: |
| 20 | + if node1 not in self.connections: |
| 21 | + self.add_node(node1) |
| 22 | + if node2 not in self.connections: |
| 23 | + self.add_node(node2) |
| 24 | + self.connections[node1][node2] = probability |
| 25 | + |
| 26 | + def get_nodes(self) -> List[str]: |
| 27 | + return list(self.connections) |
| 28 | + |
| 29 | + def transition(self, node: str) -> str: |
| 30 | + current_probability = 0 |
| 31 | + random_value = random() |
| 32 | + |
| 33 | + for dest in self.connections[node]: |
| 34 | + current_probability += self.connections[node][dest] |
| 35 | + if current_probability > random_value: |
| 36 | + return dest |
| 37 | + |
| 38 | + |
| 39 | +def get_transitions(start: str, |
| 40 | + transitions: List[Tuple[str, str, float]], |
| 41 | + steps: int) -> Dict[str, int]: |
| 42 | + ''' |
| 43 | + Running Markov Chain algorithm and calculating the number of times each node is |
| 44 | + visited |
| 45 | +
|
| 46 | + >>> transitions = [ |
| 47 | + ... ('a', 'a', 0.9), |
| 48 | + ... ('a', 'b', 0.075), |
| 49 | + ... ('a', 'c', 0.025), |
| 50 | + ... ('b', 'a', 0.15), |
| 51 | + ... ('b', 'b', 0.8), |
| 52 | + ... ('b', 'c', 0.05), |
| 53 | + ... ('c', 'a', 0.25), |
| 54 | + ... ('c', 'b', 0.25), |
| 55 | + ... ('c', 'c', 0.5) |
| 56 | + ... ] |
| 57 | +
|
| 58 | + >>> result = get_transitions('a', transitions, 5000) |
| 59 | +
|
| 60 | + >>> result['a'] > result['b'] > result['c'] |
| 61 | + True |
| 62 | + ''' |
| 63 | + |
| 64 | + graph = MarkovChainGraphUndirectedUnweighted() |
| 65 | + |
| 66 | + for node1, node2, probability in transitions: |
| 67 | + graph.add_transition_probability(node1, node2, probability) |
| 68 | + |
| 69 | + visited = Counter(graph.get_nodes()) |
| 70 | + node = start |
| 71 | + |
| 72 | + for _ in range(steps): |
| 73 | + node = graph.transition(node) |
| 74 | + visited[node] += 1 |
| 75 | + |
| 76 | + return visited |
| 77 | + |
| 78 | + |
| 79 | +if __name__ == "__main__": |
| 80 | + import doctest |
| 81 | + |
| 82 | + doctest.testmod() |
0 commit comments