Moving search to Qwant

This commit is contained in:
Adriene Hutchins 2020-03-24 19:09:01 -04:00
parent 19acae2abc
commit 391beaf422
6 changed files with 422 additions and 229 deletions

View file

@ -10,6 +10,8 @@ from discord.ext import commands
import aiohttp
import random
from typing import List
from extensions.models import SearchExceptions
import html2text
class Search(commands.Cog, name="Basic"):
@ -21,193 +23,153 @@ class Search(commands.Cog, name="Basic"):
self.bot = bot
self.info = bot.logging.info
self.warn = bot.logging.warn
self.debug = bot.logging.debug
self.request = bot.request
self.emoji = "\U0001F50D"
self.scrape_token = bot.config['SCRAPESTACK']
# Get Instances
with open('searxes.txt') as f:
self.instances = f.read().split('\n')
# Markdown converter
self.tomd = html2text.HTML2Text()
self.tomd.ignore_links = True
self.tomd.ignore_images = True
self.tomd.ignore_tables = True
self.tomd.ignore_emphasis = True
self.tomd.body_width = 0
async def _search_logic(self, query: str, is_nsfw: bool = False,
category: str = None) -> str:
"""Provides search logic for all search commands."""
category: str = 'web', count: int = 5) -> list:
"""Uses scrapestack and the Qwant API to find search results."""
# Typing
base: str
safesearch: str
# NSFW Filtering
# WARNING - This list includes slurs.
# WARNING This list includes slurs.
nono_words = [
'tranny', 'faggot', 'fag',
'porn', 'cock', 'dick',
'titty', 'boob', 'penis',
'slut', 'cum', 'jizz',
'semen', 'cooch', 'coochie',
'pussy', 'penis', 'fetish',
'bdsm', 'sexy', 'xxx',
'orgasm', 'masturbation',
'erotic', 'creampie',
'fap', 'nude', 'orgasm',
'squirting', 'yiff',
'e621'
]
nono_sites = [
'xvideos', 'pornhub',
'xhamster', 'xnxx',
'youporn', 'xxx',
'freexcafe', 'sex.com',
'e621'
'tranny', 'faggot', 'fag', 'porn', 'cock', 'dick',
'titty', ' tit ', 'boob', 'penis', 'slut', ' cum ', 'jizz',
'semen', 'cooch', 'coochie', 'pussy', 'penis', 'fetish',
'bdsm', 'sexy', 'xxx', 'orgasm', 'masturbat',
'erotic', 'creampie', 'fap', 'nude', 'orgasm',
'squirting', 'yiff', 'e621', ' sex', 'ejaculat',
'cunt', 'vagina', 'coom', 'troon', 'hentai', 'yaoi',
'bukkake', 'bara', 'shota', 'loli', 'fetish', 'spunk',
'pron', 'p0rn', 'pr0n', 'gloryhole', 'felch', 'skullfuck',
'scat', 'pissplay', 'piss play', 'underage', 'bbw',
'fisting', 'queef', "rimming", 'rimjob', 'bdsm',
'cbt', 'blumpkin', 'boner', 'prostitut', 'butt plug',
'transvestite', 'femboy', 'castrat', 'philia', 'edging',
'edgeplay', 'enema', 'facial', 'fellat', 'femdom', 'footjob',
'blowjob', 'titjob', 'handjob', 'frot', 'gang bang', 'gangbang',
'glory hole', 'hermap', 'jerk off', 'jerking off', 'jack off',
'jacking off', 'kink', 'wet dream', 'anal', 'pegging', 'precum',
'pre-cum', 'pre cum', 'priap', 'scrotum', 'shemale', 'smegma',
'smut', 'softcore', 'transsexual', 'voyeur', 'viagra', 'wank',
'whore'
]
if not is_nsfw:
for i in nono_words:
if i in query.replace(" ", ""):
return (
"**Sorry!** That query included language "
"we cannot accept in a non-NSFW channel. "
"Please try again in an NSFW channel."
)
if any(n in query for n in nono_words):
raise SearchExceptions.SafesearchFail('Query had NSFW.')
# Choose an instance
if self.instances == []:
with open('searxes.txt') as f:
self.instances = f.read().split('\n')
instance = random.sample(self.instances, k=1)[0]
# Scrape or not
# if self.scrape_token != '':
# base = (
# "http://api.scrapestack.com/scrape"
# f"?access_key={self.scrape_token}"
# f"&url=https://api.qwant.com/api"
# )
# print(base)
# else:
base = "https://api.qwant.com/api"
# Error Template
error_msg = (
"**An error occured!**\n\n"
f"There was a problem with `{instance}`. Please try again later.\n"
f"_If problems with this instance persist, contact`{self.bot.appinfo.owner}` to have it removed._"
# Safesearch
if is_nsfw:
safesearch = "0"
else:
safesearch = "2"
# Search URL Building
# api.qwant.com/api/search/web?count=5&q=test&safesearch=2&...
search_url = (
f"{base}/search/{category}"
f"?count={count}"
f"&q={query}"
f"&safesearch={safesearch}"
"&t=web"
"&locale=en_US"
"&uiv=4"
)
await self.debug(search_url, name="_search_logic")
# Searching
headers = {
'User-Agent': (
'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:74.0)'
' Gecko/20100101 Firefox/74.0'
)
}
async with self.request.get(search_url, headers=headers) as resp:
to_parse = await resp.json()
# Sends results
return to_parse['data']['result']['items']
async def _basic_search(self, ctx, query: str, category: str = 'web'):
"""Basic search formatting."""
# NOTE Customizable count not yet implemented.
count: int = 5
# Safesearch variable
is_nsfw = (
ctx.channel.is_nsfw() if hasattr(ctx.channel, 'is_nsfw')
else False
)
# Create the URL to make an API call to
call = f'{instance}search?q={query}&format=json&language=en-US'
# If a type is provided, add that type to the call URL
if category:
call += f'&categories={category}'
if is_nsfw:
call += '&safesearch=0'
else:
call += '&safesearch=1'
# Figure out engines for different categories to get decent results.
if category == 'videos':
call += '&engines=bing+videos,google+videos'
# Make said API call
try:
async with self.request.get(call) as resp:
response = await resp.json()
except aiohttp.ClientError:
return error_msg
# Split our response data up for parsing
# infoboxes = response['infoboxes']
results = response['results']
# Create message with results
try:
# Handle tiny result count
if len(results) > 5:
amt = 5
else:
amt = len(results)
# Remove no-no sites
if not is_nsfw:
for r in results[0:7]:
for n in nono_sites:
if n in r['url']:
results.remove(r)
# Escape stuff
query = discord.utils.escape_mentions(query)
query = discord.utils.escape_markdown(query)
# Header
msg = f"Showing **{amt}** results for `{query}`. \n\n"
# Expanded Result
msg += (
f"**{results[0]['title']}** <{results[0]['url']}>\n"
f"{results[0]['content']}\n\n")
# Other Results
msg += "\n".join(
[f"**{entry['title']}** <{entry['url']}>" for entry in results[1:5]])
# Instance Info
msg += f"\n\n_Results retrieved from instance `{instance}`._"
return msg
# Reached if error with returned results
except (KeyError, IndexError) as e:
# Logging
await self.warn(
f"A user encountered a(n) `{e}` with <{instance}> when searching for `{query}`. "
"Consider removing it or looking into it.",
name="Failed Instance"
)
self.instances.remove(instance) # Weed the instance out
# Recurse until good response
return await self._search_logic(query, is_nsfw)
async def _instance_check(self, instance: str, content: dict) -> bool:
"""Checks the quality of an instance."""
# Makes sure proper values exist
if 'error' in content:
return False
if not ('engines' in content and 'initial' in content['timing']):
return False
if not ('google' in content['engines'] and 'enabled' in content['engines']['google']):
return False
# Makes sure google is enabled
if not content['engines']['google']['enabled']:
return False
# Makes sure is not Tor
if content['network_type'] != 'normal':
return False
# Only picks instances that are fast enough
timing = int(content['timing']['initial'])
if timing > 0.20:
return False
# Check for Google captcha
test_search = f'{instance}/search?q=test&format=json&lang=en-US'
try:
async with self.request.get(test_search) as resp:
response = await resp.json()
response['results'][0]['content']
except (aiohttp.ClientError, KeyError, IndexError):
return False
# Reached if passes all checks
return True
async def _basic_search(self, ctx, query: str,
category: str = None):
"""Base search message generation."""
# Handling
async with ctx.typing():
is_nsfw = (
ctx.channel.is_nsfw() if hasattr(ctx.channel, 'is_nsfw')
else False
# Searches
results = await self._search_logic(query, is_nsfw, category)
count = len(results)
# Escapes all nasties for displaying
query_display = discord.utils.escape_mentions(query)
query_display = discord.utils.escape_markdown(query_display)
# Return if no results
try:
results[0]
except IndexError:
return await ctx.send(
f"No results found for `{query_display}`."
)
# Gets the first entry's stuff
first_title = self.tomd.handle(results[0]['title']).rstrip('\n')
first_url = results[0]['url']
first_desc = self.tomd.handle(results[0]['desc']).rstrip('\n')
# Builds the substring for each of the other results.
other_results: List[str] = []
for r in results[1:count]:
title = self.tomd.handle(r['title']).rstrip('\n')
url = results[0]['url']
other_results.append(f"**{title}** <{url}>")
other_msg: str = "\n".join(other_results)
# Builds message
msg = (
f"Showing **{count}** results for `{query_display}`.\n\n"
f"**{first_title}** <{first_url}>\n{first_desc}\n\n"
f"{other_msg}\n\n_Powered by Qwant._"
)
msg = await self._search_logic(query, is_nsfw, category)
# Sends message
await self.debug(msg, name="_basic_search")
await ctx.send(msg)
await self.info(
content=(
f"**{ctx.author}** searched for `{query}` "
f"in \"{ctx.guild}\" and got this:"
f"\n\n{msg}"
),
name="Search Results"
)
@commands.command()
async def search(self, ctx, *, query: str):
"""Search online for general results."""
@ -250,51 +212,27 @@ class Search(commands.Cog, name="Basic"):
await self._basic_search(ctx, query, 'maps')
@commands.command()
@commands.is_owner()
async def rejson(self, ctx):
"""Refreshes the list of instances for searx."""
msg = await ctx.send('<a:updating:403035325242540032> Refreshing instance list...\n\n'
'(Due to extensive quality checks, this may take a bit.)')
plausible: List[str] = []
# Get, parse, and quality check all instances
async with self.request.get('https://searx.space/data/instances.json') as r:
# Parsing
searx_json = await r.json()
instances = searx_json['instances']
# Quality Check
for i in instances:
content = instances.get(i)
is_good: bool = await self._instance_check(i, content)
if is_good:
plausible.append(i)
# Save new list
self.instances = plausible
with open('searxes.txt', 'w') as f:
f.write('\n'.join(plausible))
await msg.edit(content='Instances refreshed!')
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
"""Listener makes no command fallback to searching."""
if isinstance(error, commands.CommandNotFound) or \
isinstance(error, commands.CheckFailure):
fallback = (commands.CommandNotFound, commands.CheckFailure)
# Handling
async with ctx.typing():
# Prepares term
term = ctx.message.content.replace(ctx.prefix, '', 1)
term = term.lstrip(' ')
# Does search
await self._basic_search(ctx, term)
if isinstance(error, fallback):
try:
await self._basic_search(
ctx, ctx.message.content[len(ctx.prefix):]
)
except SearchExceptions.SafesearchFail:
await ctx.send(
"**Sorry!** That query included language "
"we cannot accept in a non-NSFW channel. "
"Please try again in an NSFW channel."
)
except Exception as e:
print(e)
def setup(bot):
bot.add_cog(Search(bot))