- # Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
- # License: MIT. See LICENSE
-
- import frappe
- from frappe import _, msgprint
- from frappe.utils import cint, get_url, now_datetime
- from frappe.utils.data import getdate
- from frappe.utils.verified_command import get_signed_params, verify_request
-
-
- def get_emails_sent_this_month(email_account=None):
- """Get count of emails sent from a specific email account.
-
- :param email_account: name of the email account used to send mail
-
- if email_account=None, email account filter is not applied while counting
- """
- today = getdate()
- month_start = today.replace(day=1)
-
- filters = {
- "status": "Sent",
- "creation": [">=", str(month_start)],
- }
- if email_account:
- filters["email_account"] = email_account
-
- return frappe.db.count("Email Queue", filters=filters)
-
-
- def get_emails_sent_today(email_account=None):
- """Get count of emails sent from a specific email account.
-
- :param email_account: name of the email account used to send mail
-
- if email_account=None, email account filter is not applied while counting
- """
- q = """
- SELECT
- COUNT(`name`)
- FROM
- `tabEmail Queue`
- WHERE
- `status` in ('Sent', 'Not Sent', 'Sending')
- AND
- `creation` > (NOW() - INTERVAL '24' HOUR)
- """
-
- q_args = {}
- if email_account is not None:
- if email_account:
- q += " AND email_account = %(email_account)s"
- q_args["email_account"] = email_account
- else:
- q += " AND (email_account is null OR email_account='')"
-
- return frappe.db.sql(q, q_args)[0][0]
-
-
- def get_unsubscribe_message(
- unsubscribe_message: str, expose_recipients: str
- ) -> "frappe._dict[str, str]":
- unsubscribe_message = unsubscribe_message or _("Unsubscribe")
- unsubscribe_link = f'<a href="<!--unsubscribe_url-->" target="_blank">{unsubscribe_message}</a>'
- unsubscribe_html = _("{0} to stop receiving emails of this type").format(unsubscribe_link)
- html = f"""<div class="email-unsubscribe">
- <!--cc_message-->
- <div>
- {unsubscribe_html}
- </div>
- </div>"""
-
- text = f"\n\n{unsubscribe_message}: <!--unsubscribe_url-->\n"
- if expose_recipients == "footer":
- text = f"\n<!--cc_message-->{text}"
-
- return frappe._dict(html=html, text=text)
-
-
- def get_unsubcribed_url(
- reference_doctype, reference_name, email, unsubscribe_method, unsubscribe_params
- ):
- params = {
- "email": email.encode("utf-8"),
- "doctype": reference_doctype.encode("utf-8"),
- "name": reference_name.encode("utf-8"),
- }
- if unsubscribe_params:
- params.update(unsubscribe_params)
-
- query_string = get_signed_params(params)
-
- # for test
- frappe.local.flags.signed_query_string = query_string
-
- return get_url(unsubscribe_method + "?" + get_signed_params(params))
-
-
- @frappe.whitelist(allow_guest=True)
- def unsubscribe(doctype, name, email):
- # unsubsribe from comments and communications
- if not verify_request():
- return
-
- try:
- frappe.get_doc(
- {
- "doctype": "Email Unsubscribe",
- "email": email,
- "reference_doctype": doctype,
- "reference_name": name,
- }
- ).insert(ignore_permissions=True)
-
- except frappe.DuplicateEntryError:
- frappe.db.rollback()
-
- else:
- frappe.db.commit()
-
- return_unsubscribed_page(email, doctype, name)
-
-
- def return_unsubscribed_page(email, doctype, name):
- frappe.respond_as_web_page(
- _("Unsubscribed"),
- _("{0} has left the conversation in {1} {2}").format(email, _(doctype), name),
- indicator_color="green",
- )
-
-
- def flush(from_test=False):
- """flush email queue, every time: called from scheduler"""
- from frappe.email.doctype.email_queue.email_queue import send_mail
- from frappe.utils.background_jobs import get_jobs
-
- # To avoid running jobs inside unit tests
- if frappe.are_emails_muted():
- msgprint(_("Emails are muted"))
- from_test = True
-
- if cint(frappe.db.get_default("suspend_email_queue")) == 1:
- return
-
- try:
- queued_jobs = set(get_jobs(site=frappe.local.site, key="job_name")[frappe.local.site])
- except Exception:
- queued_jobs = set()
-
- for row in get_queue():
- try:
- job_name = f"email_queue_sendmail_{row.name}"
- if job_name not in queued_jobs:
- frappe.enqueue(
- method=send_mail,
- email_queue_name=row.name,
- is_background_task=not from_test,
- now=from_test,
- job_name=job_name,
- queue="short",
- )
- else:
- frappe.logger().debug(f"Not queueing job {job_name} because it is in queue already")
- except Exception:
- frappe.get_doc("Email Queue", row.name).log_error()
-
-
- def get_queue():
- return frappe.db.sql(
- """select
- name, sender
- from
- `tabEmail Queue`
- where
- (status='Not Sent' or status='Partially Sent') and
- (send_after is null or send_after < %(now)s)
- order
- by priority desc, creation asc
- limit 500""",
- {"now": now_datetime()},
- as_dict=True,
- )
-
-
- def set_expiry_for_email_queue():
- """Mark emails as expire that has not sent for 7 days.
- Called daily via scheduler.
- """
-
- frappe.db.sql(
- """
- UPDATE `tabEmail Queue`
- SET `status`='Expired'
- WHERE `modified` < (NOW() - INTERVAL '7' DAY)
- AND `status`='Not Sent'
- AND (`send_after` IS NULL OR `send_after` < %(now)s)""",
- {"now": now_datetime()},
- )
This file has been truncated. show original