#!/usr/bin/env python
# nullanvoid :: bitjammin
import RPi.GPIO as GPIO
import serial
import sys
import time
def read_buf(port, chunk_size=200):
if not port.timeout:
print('Port needs to have a timeout set')
sys.exit(1)
read_buffer = b""
while True:
# Read in chunks. Each chunk will wait as long as specified by
# timeout. Increase chunk_size to fail quicker
byte_chunk = port.read(size=chunk_size)
read_buffer += byte_chunk
if not len(byte_chunk) == chunk_size:
break
return read_buffer
def read_port(port):
if port.inWaiting():
time.sleep(2)
receiving_buff = read_buf(port)
with open('./modem.log', 'a+') as log:
log.write(receiving_buff.decode())
return receiving_buff.decode()
class Modem:
def __init__(self, power_pin, port):
self.power_pin = power_pin
self.port = port
def is_on(self):
self.port.write('AT\r\n'.encode())
time.sleep(1)
self.port.write('AT\r\n'.encode())
time.sleep(1)
self.port.write('AT\r\n'.encode())
time.sleep(1)
port_diag = read_port(self.port)
if port_diag == None or 'OK' not in port_diag:
return 1
return 0
# Provided by waveshare:
# https://www.waveshare.com/w/upload/e/e6/SIM7080G_Cat_M_NB_IoT_HAT_Code.tar.gz
def power(self, state):
if state == 1:
if self.is_on() == 0:
print('Modem is on... skipping')
return 0
print("Powering up SIM7080G...")
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self.power_pin,GPIO.OUT)
time.sleep(1)
GPIO.output(self.power_pin,GPIO.HIGH)
time.sleep(1)
GPIO.output(self.power_pin,GPIO.LOW)
time.sleep(5)
elif state == 0:
print("Powering down SIM7080G...")
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self.power_pin,GPIO.OUT)
GPIO.output(self.power_pin,GPIO.HIGH)
time.sleep(2)
GPIO.output(self.power_pin,GPIO.LOW)
time.sleep(5)
else:
print("Wrong state...")
sys.exit(1)
return 0
class SMS:
def __init__(self, port):
self.port = port
def delete(self, msg_id, flag=0):
at_command = 'AT+CMGD=%s,%s\r\n' % (msg_id, flag)
self.port.write(at_command.encode())
time.sleep(1)
resp = read_port(self.port)
if 'OK' not in resp:
print('Error deleting message: %s\n%s' % (msg_id, resp))
return 1
return 0
def mode(self, mode):
# Modes: 0: PDU, 1: text
at_command = 'AT+CMGF=%s\r\n' % (mode)
self.port.write(at_command.encode())
time.sleep(1)
if 'OK' not in read_port(self.port):
return 1
return 0
def read(self):
if (self.mode(1)) == 1:
print('Error entering text mode')
return 1
sms_struct = dict()
self.port.write('AT+CMGL="ALL"\r\n'.encode())
time.sleep(1)
sms_messages = read_port(self.port).split('\n')
at_command = sms_messages[0]
at_status = sms_messages[(len(sms_messages) - 1)]
# The message data follows each metadata line; step by 2, then n+1 to get the message
for n in range(1, (len(sms_messages) - 2), 2):
sms_meta_data = sms_messages[n].split(',')
# From the serial console, we may pick up
# '+CMTI' or 'DST' messages which we dont want to parse.
if '+CMGL' not in sms_meta_data[0].split(':')[0]:
continue
# int() for index values
msg_id = int(sms_meta_data[0].split(':')[1])
# Remove any padding and remove '"'
sms_struct[msg_id] = {
'status': sms_meta_data[1].rstrip().replace('"',''),
'from_address': sms_meta_data[2].rstrip().replace('"',''),
'mr': sms_meta_data[3].rstrip().replace('"',''),
'received_date': sms_meta_data[4].rstrip().replace('"',''),
'received_time': sms_meta_data[5].rstrip().replace('"',''),
'message': sms_messages[n + 1].rstrip()
}
unread_messages = 0
for n in range(0, (len(sms_struct) - 1)):
if sms_struct[n]['status'] == 'REC UNREAD':
unread_messages += 1
with open("./modem.log", "a+") as log:
log.write('Received %s new messages\n' % (unread_messages))
return sms_struct
def send(self, number, message):
if (self.mode(1)) == 1:
print('Error entering text mode')
# If exceeded, modem throws "+CMS ERROR: 305"
if (len(message)) > 160:
print('Message length exceeds 160 bytes')
return 1
at_command = 'AT+CMGS="%s"\r\n' % (number)
self.port.write(at_command.encode())
time.sleep(1)
# Send the message, followed by \x1a (ctrl+z)
message = message + chr(26)
self.port.write(message.encode())
# Max 60s response time
time.sleep(7)
print(read_port(self.port))
def main():
com = serial.Serial('/dev/ttyS0', 9600, timeout=3)
modem = Modem(4, com)
modem.power(1)
sms = SMS(com)
msgs = sms.read()
for msg in msgs.items():
print(msg)
if __name__ == '__main__':
main()