#!/data/data/com.termux/files/usr/bin/python3 from subprocess import Popen, DEVNULL, PIPE from json import loads as read_json from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer as http4_server_do_not_use from urllib.parse import quote, unquote from socket import AF_INET6 from threading import Thread from time import sleep from cgi import FieldStorage from os import kill, getpid sms_list_program = '/data/data/com.termux/files/usr/bin/termux-sms-list' sms_send_program = '/data/data/com.termux/files/usr/bin/termux-sms-send' contacts_list_program = '/data/data/com.termux/files/usr/bin/termux-contact-list' class HTTPServer (http4_server_do_not_use): ''' This block is just to have a IPv6 web server This is not IPv6 only, the server also accepts IPv4 clients ''' address_family = AF_INET6 def html_header (title = 'No title'): return ('\n\n\t' + title + '\n\n') def html_title (text): return ('\n\t

' + text + '

') def html_paragraph (text): return ('\n\t

' + text + '

') def html_list (lst): t = '\n\t' return (t) def html_link (url, text = None): if text is None: text = url return ('' + text + '') def html_table (body, table_style = None, cell_style = None): t = '\n\t' for line in body: t += '\n\t\t' for cell in line: t += '\n\t\t\t' + cell + '' t += '\n\t\t' t += '\n\t' return (t) def html_form (body, url, post = True): return ('\n\t
' + body + '\n\t
') def html_form_text (id, value = '', textarea = False): if textarea: return ('\n\t') return ('\n\t') def html_form_hidden (id, value): return ('\n\t') def html_form_submit (label): return ('\n\t') def html_footer (): return ('\n\n') class SMS_Service (Thread): def __init__ (self): Thread.__init__ (self) self.sms_list = [] def run (self): limit_fast_get = 1000000 limit_normal_get = 20 offset = 0 while True: msgs = self.get_sms (limit = limit_fast_get, offset = offset) if len (msgs) != 0: self.sms_list += msgs if len (msgs) < limit_fast_get: break offset += limit_fast_get for msg in self.sms_list: del (msg ['read']) while True: sleep (5) getting_new = True offset = 0 while getting_new: msgs = self.get_sms (limit = limit_normal_get, offset = offset) getting_new = False for msg in msgs: del (msg ['read']) for msg in msgs: if msg not in self.sms_list: getting_new = True self.sms_list.append (msg) offset += limit_normal_get def get_sms (self, limit = 10, offset = 0): while True: try: if type (limit) != int or type (offset) != int or limit < 1 or offset < 0: raise (TypeError ('limit and offset must be positive integers')) p = Popen ( [sms_list_program, '-l', str (limit), '-o', str (offset)], stdin = DEVNULL, stdout = PIPE ) return (read_json (p.communicate () [0])) except Exception: pass def send_sms (self, dest, message): p = Popen ( [sms_send_program, '-n', dest.replace (' ', '')], stdin = PIPE, stdout = DEVNULL ) p.communicate (message.encode ('utf-8')) def sorted_list (self): return (sorted (self.sms_list, key = lambda item: item ['received'])) def thread_list (self): l = {} for i in self.sms_list: if i ['threadid'] in l: if l [i ['threadid']] [1] < i ['received']: l [i ['threadid']] [1] = i ['received'] l [i ['threadid']] [2] = i ['body'] l [i ['threadid']] [3] += 1 else: if 'sender' in i: l [i ['threadid']] = [i ['number'], i ['received'], i ['body'], 1, i ['sender']] else: l [i ['threadid']] = [i ['number'], i ['received'], i ['body'], 1] return (l) def get_thread (self, id): l = [] number = None name = None for i in self.sms_list: if str (i ['threadid']) == str (id): l.append ((i ['received'], True if i ['type'] == 'inbox' else False, i ['body'])) if number is None: number = i ['number'] if 'sender' in i: name = i ['sender'] return (number, name, sorted (l, key = lambda item: item [0])) def contact_list (): p = Popen ( [contacts_list_program, ], stdin = DEVNULL, stdout = PIPE ) return (read_json (p.communicate () [0])) class Web_Service (BaseHTTPRequestHandler): def do_GET (self): global locked_client client = self.client_address [0] if locked_client is None: locked_client = client print ('Locking on ' + str (client) + '…') if locked_client == client: global sms if self.path == '/': self.send_response (200) self.send_header ('Content-type', 'text/html; charset=UTF-8') self.end_headers () ls = sms.thread_list () lst = [] for i in sorted (ls, key = lambda item: ls [item] [1], reverse = True): s = ls [i] [0] + ' (' + str (ls [i] [3]) + ')' if len (ls [i]) == 5: s = ls [i] [4] + ' (' + str (ls [i] [3]) + ')' lst.append ([html_link ('/?thread=' + str (i), text = s), ls [i] [1], ls [i] [2] [0:50]]) self.wfile.write (( html_header (title = 'Conversations list') + html_title ('Conversation') + html_paragraph (html_link ('/new', text = 'New conversation')) + html_paragraph (html_link ('/', text = 'Refresh list')) + html_paragraph ('List of conversations follows:') + html_table (lst) + html_footer () ).encode ('utf-8')) elif self.path == '/new': self.send_response (200) self.send_header ('Content-type', 'text/html; charset=UTF-8') self.end_headers () ls = contact_list () lst = [] for i in sorted (ls, key = lambda item: item ['name']): lst.append (html_link ('/new?num=' + quote (i ['number']), text = i ['name'] + ' (' + i ['number'] + ')')) self.wfile.write (( html_header (title = 'New conversation') + html_paragraph (html_link ('/', text = '← Back to list')) + html_title ('Create a new conversation') + html_paragraph ('Put number here:') + html_form ( html_form_text ('num') + html_form_submit ('Validate'), '/new', post = False ) + html_paragraph ('List of contacts follows:') + html_list (lst) + html_footer () ).encode ('utf-8')) elif self.path [0:9] == '/new?num=': self.send_response (200) self.send_header ('Content-type', 'text/html; charset=UTF-8') self.end_headers () number = self.path [9:] self.wfile.write (( html_header (title = 'New conversation') + html_paragraph (html_link ('/', text = '← Back to list')) + html_paragraph (html_link ('/new', text = '← Back to new message')) + html_title ('Create a new conversation with ' + unquote (number)) + html_paragraph ('Message:') + html_form ( html_form_text ('msg', textarea = True) + html_form_hidden ('number', unquote (number)) + html_form_submit ('Send'), '/send' ) + html_footer () ).encode ('utf-8')) elif self.path [0:9] == '/?thread=': try: id = self.path [9:] number, name, conv = sms.get_thread (id) if number is None: raise (IndexError) except (ValueError, IndexError): self.send_response (404) self.send_header ('Content-type', 'text/html') self.end_headers () self.wfile.write (( html_header (title = 'Page not found') + html_title ('Not found') + html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') + html_footer () ).encode ('utf-8')) return () self.send_response (200) self.send_header ('Content-type', 'text/html; charset=UTF-8') self.end_headers () t = [] for i in sorted (conv, key = lambda item: item [0], reverse = True) [0:2000]: if i [1]: t.append ([i [2], '↓ ' + i [0]]) else: t.append (['↑ ' + i [0], i [2]]) if name is None: name = number else: name = name + ' (' + number + ')' self.wfile.write (( html_header (title = 'Conversation with ' + name) + html_paragraph (html_link ('/', text = '← Back to list')) + html_title (name) + html_form ( html_form_text ('msg', textarea = True) + html_form_hidden ('number', number) + html_form_submit ('Send'), '/send' ) + html_link (self.path, text = 'Refresh') + html_table (t, table_style = 'table-layout: fixed; border: 1px solid black', cell_style = 'width: 500px; border: 1px solid black') + html_footer () ).encode ('utf-8')) else: self.send_response (404) self.send_header ('Content-type', 'text/html') self.end_headers () self.wfile.write (( html_header (title = 'Page not found') + html_title ('Not found') + html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') + html_footer () ).encode ('utf-8')) else: self.send_response (403) self.send_header ('Content-type', 'text/html') self.end_headers () self.wfile.write (( html_header (title = 'Access denied') + html_title ('Access denied') + html_paragraph ('Access have allready been granted to another host.') + html_footer () ).encode ('utf-8')) def do_POST (self): global locked_client client = self.client_address [0] if locked_client is None: locked_client = client print ('Locking on ' + str (client) + '…') if locked_client == client: global sms if self.path == '/send': post_data = FieldStorage (fp = self.rfile, headers = self.headers, environ = {'REQUEST_METHOD': 'POST'}) sms.send_sms (post_data.getvalue ('number'), post_data.getvalue ('msg')) self.send_response (301) self.send_header ('Location', self.headers.get ('Referer')) self.end_headers () else: self.send_response (404) self.send_header ('Content-type', 'text/html') self.end_headers () self.wfile.write (( html_header (title = 'Page not found') + html_title ('Not found') + html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') + html_footer () ).encode ('utf-8')) else: self.send_response (403) self.send_header ('Content-type', 'text/html') self.end_headers () self.wfile.write (( html_header (title = 'Access denied') + html_title ('Access denied') + html_paragraph ('Access have allready been granted to another host.') + html_footer () ).encode ('utf-8')) if __name__ == '__main__': sms = SMS_Service () server = HTTPServer (('::', 8080), Web_Service) locked_client = None print ('Started!') try: sms.start () server.serve_forever () except KeyboardInterrupt: print ('Interrupt received, closing process…') server.server_close () # BUG: the program must kill itself to end or the user must send Ctrl+C several time to close # TODO: find another way to close the program in one action, suicide is never the solution kill (getpid (), 15)