from calendar import timegm
from datetime import datetime, timedelta
from requests import Session
import pyfilemail as pm
from pyfilemail import logger, login_required, load_config, get_configfile
from urls import get_URL
from transfer import Transfer
from errors import hellraiser, FMBaseError
[docs]class User(object):
"""This is the entry point to filemail.
If you use a registered username you'll need to provide
a password to login. If no password is passed during init a search for
password is done in $HOME/.netrc
You may also login at a later time with the :func:`User.login` function.
:param username: your email/username
:param password: filemail password if registered username is used
:type username: str
:type password: str
::
#$HOME/.netrc example:
machine yourfilemailuser@email.com
login yourfilemailuser@email.com
password topsecretpassword
"""
def __init__(self, username, password=None):
self.username = username
self.transfers = []
self.session = Session()
self.session.cookies['source'] = 'Desktop'
self.config = load_config()
apikey = self.config.get('apikey')
self.session.cookies['apikey'] = apikey
if apikey.startswith('GET KEY AT:'):
msg = 'No API KEY set in {conf}.\n{apikey}\n'
logger.warning(msg.format(conf=get_configfile(), apikey=apikey))
if password is None and pm.NETRC:
machine = pm._netrc.authenticators(username)
if machine:
password = machine[2]
else:
password = None
if password is not None:
self.login(password)
else:
self.session.cookies['logintoken'] = None
@property
def is_registered(self):
"""If user is a registered user or not.
:rtype: bool
"""
return not self.session.cookies.get('logintoken')
@property
def logged_in(self):
"""If registered user is logged in or not.
:rtype: bool
"""
return self.session.cookies.get('logintoken') and True or False
[docs] def login(self, password):
"""Login to filemail as the current user.
:param password:
:type password: ``str``
"""
method, url = get_URL('login')
payload = {
'apikey': self.config.get('apikey'),
'username': self.username,
'password': password,
'source': 'Desktop'
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return True
hellraiser(res)
@login_required
[docs] def logout(self):
"""Logout of filemail and closing the session."""
# Check if all transfers are complete before logout
self.transfers_complete
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken')
}
method, url = get_URL('logout')
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
self.session.cookies['logintoken'] = None
return True
hellraiser(res)
@property
def transfers_complete(self):
"""Check if all transfers are completed."""
for transfer in self.transfers:
if not transfer.is_complete:
error = {
'errorcode': 4003,
'errormessage': 'You must complete transfer before logout.'
}
hellraiser(error)
@login_required
[docs] def get_sent(self, expired=False, for_all=False):
"""Retreve information on previously sent transfers.
:param expired: Whether or not to return expired transfers.
:param for_all: Get transfers for all users.
Requires a Filemail Business account.
:type for_all: bool
:type expired: bool
:rtype: ``list`` of :class:`pyfilemail.Transfer` objects
"""
method, url = get_URL('get_sent')
payload = {
'apikey': self.session.cookies.get('apikey'),
'logintoken': self.session.cookies.get('logintoken'),
'getexpired': expired,
'getforallusers': for_all
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return self._restore_transfers(res)
hellraiser(res.json())
@login_required
[docs] def get_user_info(self, save_to_config=True):
"""Get user info and settings from Filemail.
:param save_to_config: Whether or not to save settings to config file
:type save_to_config: ``bool``
:rtype: ``dict`` containig user information and default settings.
"""
method, url = get_URL('user_get')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken')
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
settings = res.json()['user']
if save_to_config:
self.config.update(settings)
return settings
hellraiser(res)
@login_required
[docs] def update_user_info(self, **kwargs):
"""Update user info and settings.
:param \*\*kwargs: settings to be merged with
:func:`User.get_configfile` setings and sent to Filemail.
:rtype: ``bool``
"""
if kwargs:
self.config.update(kwargs)
method, url = get_URL('user_update')
res = getattr(self.session, method)(url, params=self.config)
if res.status_code == 200:
return True
hellraiser(res)
@login_required
[docs] def get_received(self, age=None, for_all=True):
"""Retrieve a list of transfers sent to you or your company
from other people.
:param age: between 1 and 90 days.
:param for_all: If ``True`` will return received files for
all users in the same business. (Available for business account
members only).
:type age: ``int``
:type for_all: ``bool``
:rtype: ``list`` of :class:`Transfer` objects.
"""
method, url = get_URL('received_get')
if age:
if not isinstance(age, int) or age < 0 or age > 90:
raise FMBaseError('Age must be <int> between 0-90')
past = datetime.utcnow() - timedelta(days=age)
age = timegm(past.utctimetuple())
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken'),
'getForAllUsers': for_all,
'from': age
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return self._restore_transfers(res)
hellraiser(res)
def _restore_transfers(self, response):
"""Restore transfers from josn retreived Filemail
:param response: response object from request
:rtype: ``list`` with :class:`Transfer` objects
"""
transfers = []
for transfer_data in response.json()['transfers']:
transfer = Transfer(self, _restore=True)
transfer.transfer_info.update(transfer_data)
transfer.get_files()
transfers.append(transfer)
return transfers
@login_required
@login_required
@login_required
@login_required
@login_required
@login_required
[docs] def get_groups(self):
"""Get contact groups
:rtype: ``list`` of ``dict`` with group data
"""
method, url = get_URL('groups_get')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken')
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return res.json()['groups']
hellraiser(res)
@login_required
[docs] def get_group(self, name):
"""Get contact group by name
:param name: name of group
:type name: ``str``, ``unicode``
:rtype: ``dict`` with group data
"""
groups = self.get_groups()
for group in groups:
if group['contactgroupname'] == name:
return group
msg = 'No group named: "{name}" found.'
raise FMBaseError(msg.format(name=name))
@login_required
[docs] def add_group(self, name):
"""Add new contact group
:param name: name of new group
:type name: ``str``, ``unicode``
:rtype: ``dict`` with group data
"""
method, url = get_URL('group_add')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken'),
'name': name
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return res.json()['groups']
hellraiser(res)
@login_required
[docs] def delete_group(self, name):
"""Delete contact group
:param name: of group
:type name: ``str``, ``unicode``
:rtype: ``bool``
"""
group = self.get_group(name)
method, url = get_URL('group_delete')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken'),
'contactgroupid': group['contactgroupid']
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return True
hellraiser(res)
@login_required
[docs] def rename_group(self, group, newname):
"""Rename contact group
:param group: group data or name of group
:param newname: of group
:type group: ``str``, ``unicode``, ``dict``
:type newname: ``str``, ``unicode``
:rtype: ``bool``
"""
if isinstance(group, basestring):
group = self.get_contact(group)
method, url = get_URL('group_update')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken'),
'contactgroupid': group['contactgroupid'],
'name': newname
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return True
hellraiser(res)
@login_required
@login_required
@login_required
[docs] def get_company_info(self):
"""Get company settings from Filemail
:rtype: ``dict`` with company data
"""
method, url = get_URL('company_get')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken')
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return res.json()['company']
hellraiser(res)
@login_required
[docs] def update_company(self, company):
"""Update company settings
:param company: updated settings
:type company: ``dict``
:rtype: ``bool``
"""
if not isinstance(company, dict):
raise AttributeError('company must be a <dict>')
method, url = get_URL('company_update')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken')
}
payload.update(company)
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return True
hellraiser(res)
@login_required
[docs] def get_company_users(self):
"""Get company users from Filemail
:rtype: ``list`` of ``dict`` with user data
"""
method, url = get_URL('company_get_users')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken')
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return res.json()['users']
hellraiser(res)
@login_required
[docs] def get_company_user(self, email):
"""Get company user based on email.
:param email: address of contact
:type email: ``str``, ``unicode``
:rtype: ``dict`` with contact information
"""
users = self.get_company_users()
for user in users:
if user['email'] == email:
return user
msg = 'No user with email: "{email}" associated with this company.'
raise FMBaseError(msg.format(email=email))
@login_required
[docs] def company_add_user(self, email, name, password, receiver, admin):
"""Add a user to the company account.
:param email:
:param name:
:param password: Pass without storing in plain text
:param receiver: Can user receive files
:param admin:
:type email: ``str`` or ``unicode``
:type name: ``str`` or ``unicode``
:type password: ``str`` or ``unicode``
:type receiver: ``bool``
:type admin: ``bool``
:rtype: ``bool``
"""
method, url = get_URL('company_add_user')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken'),
'email': email,
'name': name,
'password': password,
'canreceivefiles': receiver,
'admin': admin
}
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return True
hellraiser(res)
@login_required
[docs] def update_company_user(self, email, userdata):
"""Update a company users settings
:param email: current email address of user
:param userdata: updated settings
:type email: ``str`` or ``unicode``
:type userdata: ``dict``
:rtype: ``bool``
"""
if not isinstance(userdata, dict):
raise AttributeError('userdata must be a <dict>')
method, url = get_URL('company_update_user')
payload = {
'apikey': self.config.get('apikey'),
'logintoken': self.session.cookies.get('logintoken'),
'useremail': email
}
payload.update(userdata)
res = getattr(self.session, method)(url, params=payload)
if res.status_code == 200:
return True
hellraiser(res)