From 1d59dc90227fca3b08e3e5a26a63aa7da539b9ee Mon Sep 17 00:00:00 2001 From: Sasha MOREL Date: Thu, 2 Sep 2021 11:54:40 +0200 Subject: [PATCH] Initial commit Imported this script cooked in another repos. --- sms.py | 360 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 360 insertions(+) create mode 100755 sms.py diff --git a/sms.py b/sms.py new file mode 100755 index 0000000..009cfc5 --- /dev/null +++ b/sms.py @@ -0,0 +1,360 @@ +#!/data/data/com.termux/files/usr/bin/python3 + +from subprocess import Popen, DEVNULL, PIPE +from json import loads as read_json +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer as http4_server_do_not_use +from urllib.parse import quote, unquote +from socket import AF_INET6 +from threading import Thread +from time import sleep +from cgi import FieldStorage +from sys import exit + +sms_list_program = '/data/data/com.termux/files/usr/bin/termux-sms-list' +sms_send_program = '/data/data/com.termux/files/usr/bin/termux-sms-send' +contacts_list_program = '/data/data/com.termux/files/usr/bin/termux-contact-list' + + +class HTTPServer (http4_server_do_not_use): + ''' + This block is just to have a IPv6 web server + This is not IPv6 only, the server also accepts IPv4 clients + ''' + address_family = AF_INET6 + + +def html_header (title = 'No title'): + return ('\n\n\t' + title + '\n\n') + + +def html_title (text): + return ('\n\t

' + text + '

') + + +def html_paragraph (text): + return ('\n\t

' + text + '

') + + +def html_list (lst): + t = '\n\t' + return (t) + + +def html_link (url, text = None): + if text is None: + text = url + return ('' + text + '') + + +def html_table (body, table_style = None, cell_style = None): + t = '\n\t' + for line in body: + t += '\n\t\t' + for cell in line: + t += '\n\t\t\t' + cell + '' + t += '\n\t\t' + t += '\n\t' + return (t) + + +def html_form (body, url, post = True): + return ('\n\t
' + body + '\n\t
') + + +def html_form_text (id, value = '', textarea = False): + if textarea: + return ('\n\t') + return ('\n\t') + + +def html_form_hidden (id, value): + return ('\n\t') + + +def html_form_submit (label): + return ('\n\t') + + +def html_footer (): + return ('\n\n') + + +class SMS_Service (Thread): + def __init__ (self): + Thread.__init__ (self) + self.sms_list = [] + + def run (self): + limit_fast_get = 1000000 + limit_normal_get = 20 + offset = 0 + while True: + msgs = self.get_sms (limit = limit_fast_get, offset = offset) + if len (msgs) != 0: + self.sms_list += msgs + if len (msgs) < limit_fast_get: + break + offset += limit_fast_get + while True: + sleep (10) + getting_new = True + offset = 0 + while getting_new: + msgs = self.get_sms (limit = limit_normal_get, offset = offset) + getting_new = False + for msg in msgs: + if msg not in self.sms_list: + getting_new = True + self.sms_list.append (msg) + offset += limit_normal_get + + def get_sms (self, limit = 10, offset = 0): + while True: + try: + if type (limit) != int or type (offset) != int or limit < 1 or offset < 0: + raise (TypeError ('limit and offset must be positive integers')) + p = Popen ( + [sms_list_program, '-l', str (limit), '-o', str (offset)], + stdin = DEVNULL, + stdout = PIPE + ) + return (read_json (p.communicate () [0])) + except Exception: + pass + + def send_sms (self, dest, message): + p = Popen ( + [sms_send_program, '-n', dest.replace (' ', '')], + stdin = PIPE, + stdout = DEVNULL + ) + p.communicate (message.encode ('utf-8')) + + def sorted_list (self): + return (sorted (self.sms_list, key = lambda item: item ['received'])) + + def thread_list (self): + l = {} + for i in self.sms_list: + if i ['threadid'] in l: + if l [i ['threadid']] [1] < i ['received']: + l [i ['threadid']] [1] = i ['received'] + l [i ['threadid']] [2] = i ['body'] + l [i ['threadid']] [3] += 1 + else: + if 'sender' in i: + l [i ['threadid']] = [i ['number'], i ['received'], i ['body'], 1, i ['sender']] + else: + l [i ['threadid']] = [i ['number'], i ['received'], i ['body'], 1] + return (l) + + def get_thread (self, id): + l = [] + number = None + name = None + for i in self.sms_list: + if str (i ['threadid']) == str (id): + l.append ((i ['received'], True if i ['type'] == 'inbox' else False, i ['body'])) + if number is None: + number = i ['number'] + if 'sender' in i: + name = i ['sender'] + return (number, name, sorted (l, key = lambda item: item [0])) + + +def contact_list (): + p = Popen ( + [contacts_list_program, ], + stdin = DEVNULL, + stdout = PIPE + ) + return (read_json (p.communicate () [0])) + + +class Web_Service (BaseHTTPRequestHandler): + def do_GET (self): + global locked_client + client = self.client_address [0] + if locked_client is None: + locked_client = client + print ('Locking on ' + str (client) + '…') + if locked_client == client: + global sms + if self.path == '/': + self.send_response (200) + self.send_header ('Content-type', 'text/html; charset=UTF-8') + self.end_headers () + ls = sms.thread_list () + lst = [] + for i in sorted (ls, key = lambda item: ls [item] [1], reverse = True): + s = ls [i] [0] + ' (' + str (ls [i] [3]) + ')' + if len (ls [i]) == 5: + s = ls [i] [4] + ' (' + str (ls [i] [3]) + ')' + lst.append ([html_link ('/?thread=' + str (i), text = s), ls [i] [1], ls [i] [2] [0:50]]) + self.wfile.write (( + html_header (title = 'Conversations list') + + html_title ('Conversation') + + html_paragraph (html_link ('/new', text = 'New conversation')) + + html_paragraph (html_link ('/', text = 'Refresh list')) + + html_paragraph ('List of conversations follows:') + + html_table (lst) + + html_footer () + ).encode ('utf-8')) + elif self.path == '/new': + self.send_response (200) + self.send_header ('Content-type', 'text/html; charset=UTF-8') + self.end_headers () + ls = contact_list () + lst = [] + for i in sorted (ls, key = lambda item: item ['name']): + lst.append (html_link ('/new?num=' + quote (i ['number']), text = i ['name'] + ' (' + i ['number'] + ')')) + self.wfile.write (( + html_header (title = 'New conversation') + + html_paragraph (html_link ('/', text = '← Back to list')) + + html_title ('Create a new conversation') + + html_paragraph ('Put number here:') + + html_form ( + html_form_text ('num') + + html_form_submit ('Validate'), + '/new', + post = False + ) + + html_paragraph ('List of contacts follows:') + + html_list (lst) + + html_footer () + ).encode ('utf-8')) + elif self.path [0:9] == '/new?num=': + self.send_response (200) + self.send_header ('Content-type', 'text/html; charset=UTF-8') + self.end_headers () + number = self.path [9:] + self.wfile.write (( + html_header (title = 'New conversation') + + html_paragraph (html_link ('/', text = '← Back to list')) + + html_paragraph (html_link ('/new', text = '← Back to new message')) + + html_title ('Create a new conversation with ' + unquote (number)) + + html_paragraph ('Message:') + + html_form ( + html_form_text ('msg', textarea = True) + + html_form_hidden ('number', unquote (number)) + + html_form_submit ('Send'), + '/send' + ) + + html_footer () + ).encode ('utf-8')) + elif self.path [0:9] == '/?thread=': + try: + id = self.path [9:] + number, name, conv = sms.get_thread (id) + if number is None: + raise (IndexError) + except (ValueError, IndexError): + self.send_response (404) + self.send_header ('Content-type', 'text/html') + self.end_headers () + self.wfile.write (( + html_header (title = 'Page not found') + + html_title ('Not found') + + html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') + + html_footer () + ).encode ('utf-8')) + return () + self.send_response (200) + self.send_header ('Content-type', 'text/html; charset=UTF-8') + self.end_headers () + t = [] + for i in sorted (conv, key = lambda item: item [0], reverse = True) [0:5000]: + if i [1]: + t.append ([i [2], '↓ ' + i [0]]) + else: + t.append (['↑ ' + i [0], i [2]]) + if name is None: + name = number + else: + name = name + ' (' + number + ')' + self.wfile.write (( + html_header (title = 'Conversation with ' + name) + + html_paragraph (html_link ('/', text = '← Back to list')) + + html_title (name) + + html_form ( + html_form_text ('msg', textarea = True) + + html_form_hidden ('number', number) + + html_form_submit ('Send'), + '/send' + ) + + html_link (self.path, text = 'Refresh') + + html_table (t, table_style = 'table-layout: fixed; border: 1px solid black', cell_style = 'width: 500px; border: 1px solid black') + + html_footer () + ).encode ('utf-8')) + else: + self.send_response (404) + self.send_header ('Content-type', 'text/html') + self.end_headers () + self.wfile.write (( + html_header (title = 'Page not found') + + html_title ('Not found') + + html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') + + html_footer () + ).encode ('utf-8')) + else: + self.send_response (403) + self.send_header ('Content-type', 'text/html') + self.end_headers () + self.wfile.write (( + html_header (title = 'Access denied') + + html_title ('Access denied') + + html_paragraph ('Access have allready been granted to another host.') + + html_footer () + ).encode ('utf-8')) + + def do_POST (self): + global locked_client + client = self.client_address [0] + if locked_client is None: + locked_client = client + print ('Locking on ' + str (client) + '…') + if locked_client == client: + global sms + if self.path == '/send': + post_data = FieldStorage (fp = self.rfile, headers = self.headers, environ = {'REQUEST_METHOD': 'POST'}) + sms.send_sms (post_data.getvalue ('number'), post_data.getvalue ('msg')) + self.send_response (301) + self.send_header ('Location', self.headers.get ('Referer')) + self.end_headers () + else: + self.send_response (404) + self.send_header ('Content-type', 'text/html') + self.end_headers () + self.wfile.write (( + html_header (title = 'Page not found') + + html_title ('Not found') + + html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') + + html_footer () + ).encode ('utf-8')) + else: + self.send_response (403) + self.send_header ('Content-type', 'text/html') + self.end_headers () + self.wfile.write (( + html_header (title = 'Access denied') + + html_title ('Access denied') + + html_paragraph ('Access have allready been granted to another host.') + + html_footer () + ).encode ('utf-8')) + + +if __name__ == '__main__': + sms = SMS_Service () + server = HTTPServer (('::', 8080), Web_Service) + locked_client = None + print ('Started!') + try: + sms.start () + server.serve_forever () + except KeyboardInterrupt: + print ('Interrupt received, closing process…') + server.server_close () + exit (0) \ No newline at end of file