diff --git a/arithmetic_coder.py b/arithmetic_coder.py new file mode 100644 index 0000000..99dade3 --- /dev/null +++ b/arithmetic_coder.py @@ -0,0 +1,97 @@ +from fractions import Fraction + +def uniform(syms): + def pdist(seq): + return {s: Fraction(1, len(syms)) for s in syms} + return pdist + +def cdfs(distribution): + cdf_lo = {} + cdf_hi = {} + total_probability = 0 + for sym, prob in distribution.items(): + cdf_lo[sym] = total_probability # we could also use the last symbol as the lower bound but this is more explicit + total_probability += prob + cdf_hi[sym] = total_probability + return cdf_lo, cdf_hi + +def encode(sequence, pdist): + a = Fraction(0, 1) + b = Fraction(1, 1) + + bits = [] + + for i, x in enumerate(sequence): + distribution = pdist(sequence[:i]) + cdf_lo, cdf_hi = cdfs(distribution) + rang = b - a + b = a + rang * cdf_hi[x] + a += rang * cdf_lo[x] + + # if our interval is sufficiently constrained, emit bits + while True: + if a < b < Fraction(1, 2): + bits.append(0) + a *= 2 + b *= 2 + elif Fraction(1, 2) < a < b: + bits.append(1) + a *= 2 + b *= 2 + a -= 1 + b -= 1 + else: + break + + # it may still be the case that a and b are in [0,1], i.e. we have not emitted enough bits + # to constrain it in the decoder + while a > 0 or b < 1: + c = (a + b) / 2 # slightly suboptimal: try and write out midpoint + if c < Fraction(1, 2): + bits.append(0) + a *= 2 + b *= 2 + else: + bits.append(1) + a *= 2 + a -= 1 + b *= 2 + b -= 1 + + return bits + +def decode(bits, pdist): + a = Fraction(0, 1) + b = Fraction(1, 1) + scale = Fraction(1, 2) + + sequence = [] + while bits: + distribution = pdist(sequence) + cdf_lo, cdf_hi = cdfs(distribution) + for (lo, hi), sym in zip(zip(cdf_lo.values(), cdf_hi.values()), cdf_lo.keys()): + if lo <= a < b <= hi: # if message interval is subset of symbol interval, emit symbol + sequence.append(sym) + # expand working interval and adjust scale up + # (effectively, consume symbol in weird base) + range = hi - lo + a = (a - lo) / range + b = (b - lo) / range + scale /= range + break + else: + # consume a bit + bit = bits.pop(0) + a += bit * scale + b = a + scale + scale /= 2 + + return sequence + +dist = lambda seq: {"a": Fraction(98, 100), "b": Fraction(1, 100), "c": Fraction(1, 100)} +input = "abbabaccabbabbbbbba" +bits = encode(input, dist) +print(bits) +result = "".join(decode(bits, dist)) +print(input, result) +assert input == result, "oh no" \ No newline at end of file diff --git a/bmp280_prometheus.py b/bmp280_prometheus.py new file mode 100644 index 0000000..7af0f03 --- /dev/null +++ b/bmp280_prometheus.py @@ -0,0 +1,84 @@ +import smbus +import struct +import time +from prometheus_client import start_http_server, Gauge +import sys + +location = sys.argv[1] + +bus = smbus.SMBus(1) +ADDR = 0x76 +ID_REGISTER = 0xD0 +TARGET_ID = 0x58 +CTRL_MEAS_REGISTER = 0xF4 +PRESSURE_REGISTER_BASE = 0xF7 +TEMP_REGISTER_BASE = 0xFA +CALIBRAITON_VALUES = { + "dig_T1": {"address": (0x88, 0x89), "unpack": "> 4 + +def bmp280_compensate_T_double(adc_T, dig_T1, dig_T2, dig_T3, **kwargs): + var1 = (adc_T / 16384.0 - dig_T1 / 1024.0) * dig_T2 + var2 = ((adc_T / 131072.0 - dig_T1 / 8192.0) * + (adc_T / 131072.0 - dig_T1 / 8192.0)) * dig_T3 + t_fine = int(var1 + var2) + T = (var1 + var2) / 5120.0 + return T, t_fine + +def bmp280_compensate_P_double(adc_P, t_fine, dig_P1, dig_P2, dig_P3, dig_P4, dig_P5, dig_P6, dig_P7, dig_P8, dig_P9, **kwargs): + var1 = t_fine / 2.0 - 64000.0 + var2 = var1 * var1 * dig_P6 / 32768.0 + var2 = var2 + var1 * dig_P5 * 2.0 + var2 = (var2 / 4.0) + (dig_P4 * 65536.0) + var1 = (dig_P3 * var1 * var1 / 524288.0 + dig_P2 * var1) / 524288.0 + var1 = (1.0 + var1 / 32768.0) * dig_P1 + if var1 == 0.0: + return 0 # avoid exception caused by division by zero + p = 1048576.0 - adc_P + p = (p - (var2 / 4096.0)) * 6250.0 / var1 + var1 = dig_P9 * p * p / 2147483648.0 + var2 = p * dig_P8 / 32768.0 + p = p + (var1 + var2 + dig_P7) / 16.0 + return p + +calibration = setup() + +temperature = Gauge("bmp280_temperature", "Temperature in Celsius", ("location",)) +pressure = Gauge("bmp280_pressure", "Pressure in Pascals", ("location",)) + +def read_temperature_pressure(): + adc_T = read_raw_adc(TEMP_REGISTER_BASE) + adc_P = read_raw_adc(PRESSURE_REGISTER_BASE) + T, t_fine = bmp280_compensate_T_double(adc_T, **calibration) + P = bmp280_compensate_P_double(adc_P, t_fine, **calibration) + temperature.labels(location=location).set(T) + pressure.labels(location=location).set(P) + +start_http_server(9091) + +while True: + read_temperature_pressure() + time.sleep(1) \ No newline at end of file