pleroma-relay/viera/irc_envelope.py

147 lines
4.5 KiB
Python

# irc_envelope.py
# Purpose: Conversion of RFC1459 messages to/from native objects.
#
# Copyright (c) 2014, William Pitcock <nenolod@dereferenced.org>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from pprint import pprint
class RFC1459Message(object):
@classmethod
def from_data(cls, verb, params=None, source=None, tags=None):
o = cls()
o.verb = verb
o.tags = dict()
o.source = None
o.params = list()
if params:
o.params = params
if source:
o.source = source
if tags:
o.tags.update(**tags)
return o
@classmethod
def from_message(cls, message):
if isinstance(message, bytes):
message = message.decode('UTF-8', 'replace')
s = message.split(' ')
tags = None
if s[0].startswith('@'):
tag_str = s[0][1:].split(';')
s = s[1:]
tags = {}
for tag in tag_str:
if '=' in tag:
k, v = tag.split('=', 1)
tags[k] = v
else:
tags[tag] = True
source = None
if s[0].startswith(':'):
source = s[0][1:]
s = s[1:]
verb = s[0].upper()
original_params = s[1:]
params = []
while len(original_params):
# skip multiple spaces in middle of message, as per 1459
if original_params[0] == '' and len(original_params) > 1:
original_params.pop(0)
continue
elif original_params[0].startswith(':'):
arg = ' '.join(original_params)[1:]
params.append(arg)
break
else:
params.append(original_params.pop(0))
return cls.from_data(verb, params, source, tags)
def args_to_message(self):
base = []
for arg in self.params:
casted = str(arg)
if casted and ' ' not in casted and casted[0] != ':':
base.append(casted)
else:
base.append(':' + casted)
break
return ' '.join(base)
def to_message(self):
components = []
if self.tags:
components.append('@' + ';'.join([k + '=' + v for k, v in self.tags.items()]))
if self.source:
components.append(':' + self.source)
components.append(self.verb)
if self.params:
components.append(self.args_to_message())
return ' '.join(components)
def to_event(self):
return "rfc1459 message " + self.verb, self.__dict__
def serialize(self):
return self.__dict__
def __repr__(self):
return '[RFC1459Message: "{0}"]'.format(self.to_message())
def test_rfc1459message():
print('====== PARSER TESTS ======')
print(RFC1459Message.from_message('@foo=bar PRIVMSG kaniini :this is a test message!'))
print(RFC1459Message.from_message('@foo=bar :irc.tortois.es 001 kaniini :Welcome to IRC, kaniini!'))
print(RFC1459Message.from_message('PRIVMSG kaniini :this is a test message!'))
print(RFC1459Message.from_message(':irc.tortois.es 001 kaniini :Welcome to IRC, kaniini!'))
print(RFC1459Message.from_message('CAPAB '))
print('====== STRUCTURE TESTS ======')
m = RFC1459Message.from_message('@foo=bar;bar=baz :irc.tortois.es 001 kaniini :Welcome to IRC, kaniini!')
pprint(m.serialize())
print('====== BUILDER TESTS ======')
data = {
'verb': 'PRIVMSG',
'params': ['kaniini', 'hello world!'],
'source': 'kaniini!~kaniini@localhost',
'tags': {'account-name': 'kaniini'},
}
m = RFC1459Message.from_data(**data)
print(m.to_message())
pprint(m.serialize())
print('====== ALL TESTS: PASSED ======')
if __name__ == '__main__':
test_rfc1459message()