#!/data/data/com.termux/files/usr/bin/python3
from subprocess import Popen, DEVNULL, PIPE
from json import loads as read_json
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer as http4_server_do_not_use
from urllib.parse import quote, unquote
from socket import AF_INET6
from cgi import FieldStorage
from os import kill, getpid
from sms import SMS_Service
contacts_list_program = '/data/data/com.termux/files/usr/bin/termux-contact-list'
class HTTPServer (http4_server_do_not_use):
'''
This block is just to have a IPv6 web server
This is not IPv6 only, the server also accepts IPv4 clients
'''
address_family = AF_INET6
def html_header (title = 'No title'):
return ('\n
\n\t' + title + ' \n' + '''\t''' + '\n')
def html_title (text):
return ('\n\t' + text + ' ')
def html_iframe (src, height = '150', width = '300', name = None, style = 'border:none;'):
r = ''
return (r)
def html_paragraph (text):
return ('\n\t' + text + '
')
def html_list (lst):
t = '\n\t'
for line in lst:
t += '\n\t\t' + line + ' '
t += '\n\t '
return (t)
def html_link (url, text = None, target = '_self'):
if text is None:
text = url
return ('' + text + ' ')
def html_table (body, table_style = None, cell_style = None):
t = '\n\t'
for line in body:
t += '\n\t\t'
for cell in line:
t += '\n\t\t\t' + cell + ' '
t += '\n\t\t '
t += '\n\t
'
return (t)
def html_message (msg, sent, time, name, avatar = None):
r = '\n\t\n\t\t
\n\t\t
' + msg.replace ('\n', ' ') + '
\n\t\t
' + time + '\n\t '
return (r)
def html_form (body, url, post = True):
return ('\n\t')
def html_form_text (id, value = '', textarea = False):
if textarea:
return ('\n\t')
return ('\n\t ')
def html_form_hidden (id, value):
return ('\n\t ')
def html_form_submit (label):
return ('\n\t ')
def html_footer ():
return ('\n\n')
def contact_list ():
p = Popen (
[contacts_list_program, ],
stdin = DEVNULL,
stdout = PIPE
)
return (read_json (p.communicate () [0]))
class Web_Service (BaseHTTPRequestHandler):
def do_GET (self):
global locked_client
client = self.client_address [0]
if locked_client is None:
locked_client = client
print ('Locking on ' + str (client) + '…')
if locked_client == client:
global sms
if self.path == '/':
self.send_response (200)
self.send_header ('Content-type', 'text/html; charset=UTF-8')
self.end_headers ()
self.wfile.write ((
html_header (title = 'Termux WebSMS') +
html_iframe ('/list', height = '100%', width = '30%', name = 'list') + html_iframe ('about:blank', height = '100%', width = '70%', name = 'thread') +
html_footer ()
).encode ('utf-8'))
elif self.path == '/up.svg':
self.send_response (200)
self.send_header ('Content-type', 'image/svg+xml; charset=UTF-8')
self.end_headers ()
self.wfile.write ((
'\n' +
'\t\n' +
'\t\t \n' +
'\t \n' +
' '
).encode ('utf-8'))
elif self.path == '/down.svg':
self.send_response (200)
self.send_header ('Content-type', 'image/svg+xml; charset=UTF-8')
self.end_headers ()
self.wfile.write ((
'\n' +
'\t\n' +
'\t\t \n' +
'\t \n' +
' '
).encode ('utf-8'))
elif self.path == '/list':
self.send_response (200)
self.send_header ('Content-type', 'text/html; charset=UTF-8')
self.send_header ('Refresh', '20')
self.end_headers ()
ls = sms.thread_list ()
lst = []
for i in sorted (ls, key = lambda item: ls [item] [1], reverse = True):
s = ls [i] [0] + ' (' + str (ls [i] [3]) + ')'
if len (ls [i]) == 5:
s = ls [i] [4] + ' (' + str (ls [i] [3]) + ')'
lst.append (html_link ('/?thread=' + str (i), text = s, target = 'thread'))
self.wfile.write ((
html_header (title = 'Conversations list') +
html_title ('Conversation') +
html_paragraph (html_link ('/new', text = 'New conversation', target = 'thread')) +
html_paragraph (html_link ('/list', text = 'Refresh list')) +
html_paragraph ('List of conversations follows:') +
html_list (lst) +
html_footer ()
).encode ('utf-8'))
elif self.path == '/new':
self.send_response (200)
self.send_header ('Content-type', 'text/html; charset=UTF-8')
self.end_headers ()
ls = contact_list ()
lst = []
for i in sorted (ls, key = lambda item: item ['name']):
lst.append (html_link ('/new?num=' + quote (i ['number']), text = i ['name'] + ' (' + i ['number'] + ')'))
self.wfile.write ((
html_header (title = 'New conversation') +
html_title ('Create a new conversation') +
html_paragraph ('Put number here:') +
html_form (
html_form_text ('num') +
html_form_submit ('Validate'),
'/new',
post = False
) +
html_paragraph ('List of contacts follows:') +
html_list (lst) +
html_footer ()
).encode ('utf-8'))
elif self.path [0:9] == '/new?num=':
self.send_response (200)
self.send_header ('Content-type', 'text/html; charset=UTF-8')
self.end_headers ()
number = self.path [9:]
self.wfile.write ((
html_header (title = 'New conversation') +
html_paragraph (html_link ('/new', text = '← Back to new message')) +
html_title ('Create a new conversation with ' + unquote (number)) +
html_paragraph ('Message:') +
html_form (
html_form_text ('msg', textarea = True) +
html_form_hidden ('number', unquote (number)) +
html_form_submit ('Send'),
'/send'
) +
html_footer ()
).encode ('utf-8'))
elif self.path [0:9] == '/?thread=':
try:
id = self.path [9:]
number, name, conv = sms.get_thread (id)
if number is None:
raise (IndexError)
except (ValueError, IndexError):
self.send_response (404)
self.send_header ('Content-type', 'text/html')
self.end_headers ()
self.wfile.write ((
html_header (title = 'Page not found') +
html_title ('Not found') +
html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') +
html_footer ()
).encode ('utf-8'))
return ()
if name is None:
name = number
else:
name = name + ' (' + number + ')'
self.send_response (200)
self.send_header ('Content-type', 'text/html; charset=UTF-8')
self.end_headers ()
self.wfile.write ((
html_header (title = 'Conversation with ' + name) +
html_title (name) +
html_form (
html_form_text ('msg', textarea = True) +
html_form_hidden ('number', number) +
html_form_submit ('Send'),
'/send'
) +
html_iframe ('/messages?thread=' + str (id), height = '80%', width = '100%', name = 'messages') +
html_footer ()
).encode ('utf-8'))
elif self.path [0:17] == '/messages?thread=':
try:
id = self.path [17:]
number, name, conv = sms.get_thread (id)
if number is None:
raise (IndexError)
except (ValueError, IndexError):
self.send_response (404)
self.send_header ('Content-type', 'text/html')
self.end_headers ()
self.wfile.write ((
html_header (title = 'Page not found') +
html_title ('Not found') +
html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') +
html_footer ()
).encode ('utf-8'))
return ()
self.send_response (200)
self.send_header ('Content-type', 'text/html; charset=UTF-8')
self.send_header ('Refresh', '10')
self.end_headers ()
msgs = ''
for i in sorted (conv, key = lambda item: item [0], reverse = True) [0:2000]:
msgs += html_message (i [2], not i [1], i [0], '↓' if i [1] else '↑', avatar = 'down.svg' if i [1] else 'up.svg')
if name is None:
name = number
else:
name = name + ' (' + number + ')'
self.wfile.write ((
html_header (title = 'Conversation with ' + name) +
html_link (self.path, text = 'Refresh') +
msgs +
html_footer ()
).encode ('utf-8'))
else:
self.send_response (404)
self.send_header ('Content-type', 'text/html')
self.end_headers ()
self.wfile.write ((
html_header (title = 'Page not found') +
html_title ('Not found') +
html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') +
html_footer ()
).encode ('utf-8'))
else:
self.send_response (403)
self.send_header ('Content-type', 'text/html')
self.end_headers ()
self.wfile.write ((
html_header (title = 'Access denied') +
html_title ('Access denied') +
html_paragraph ('Access have allready been granted to another host.') +
html_footer ()
).encode ('utf-8'))
def do_POST (self):
global locked_client
client = self.client_address [0]
if locked_client is None:
locked_client = client
print ('Locking on ' + str (client) + '…')
if locked_client == client:
global sms
if self.path == '/send':
post_data = FieldStorage (fp = self.rfile, headers = self.headers, environ = {'REQUEST_METHOD': 'POST'})
sms.send_sms (post_data.getvalue ('number'), post_data.getvalue ('msg'))
self.send_response (301)
self.send_header ('Location', self.headers.get ('Referer'))
self.end_headers ()
else:
self.send_response (404)
self.send_header ('Content-type', 'text/html')
self.end_headers ()
self.wfile.write ((
html_header (title = 'Page not found') +
html_title ('Not found') +
html_paragraph ('This page does not exist. You cas return to the ' + html_link ('/', text = 'home') + '.') +
html_footer ()
).encode ('utf-8'))
else:
self.send_response (403)
self.send_header ('Content-type', 'text/html')
self.end_headers ()
self.wfile.write ((
html_header (title = 'Access denied') +
html_title ('Access denied') +
html_paragraph ('Access have allready been granted to another host.') +
html_footer ()
).encode ('utf-8'))
if __name__ == '__main__':
sms = SMS_Service ()
server = HTTPServer (('::', 8080), Web_Service)
locked_client = None
print ('Started!')
try:
sms.start ()
server.serve_forever ()
except KeyboardInterrupt:
print ('Interrupt received, closing process…')
server.server_close ()
# BUG: the program must kill itself to end or the user must send Ctrl+C several time to close
# TODO: find another way to close the program in one action, suicide is never the solution
kill (getpid (), 15)