Files
diarios_oficiais_search_alems/diarios/search_service.py

849 lines
26 KiB
Python

import re
from datetime import datetime
from typing import Optional, Dict, Any, List
from elasticsearch import Elasticsearch, AsyncElasticsearch
from .schemas import BuscaDiariosResponseSchema, ResultadoSchema, PaginaSchema
import unicodedata
import asyncio
from django.conf import settings
async def is_fuzzy_appropriate(term: str) -> bool:
"""
Determina se a fuzziness é apropriada para o termo de busca.
Args:
term: Termo de busca a ser avaliado
Returns:
bool: True se fuzziness é apropriada, False caso contrário
"""
return not re.match(r"^\d+/\d+$", term.strip())
async def parse_date(date_str: Optional[str]) -> Optional[str]:
"""
Converte string de data para formato ISO para ElasticSearch.
Args:
date_str: String de data no formato YYYY-MM-DD
Returns:
Optional[str]: Data formatada ou None se inválida
"""
if not date_str:
return None
try:
dt = datetime.strptime(date_str, "%Y-%m-%d")
return dt.strftime("%Y-%m-%d")
except ValueError:
print(f"Alerta: Formato de data inválido recebido: {date_str}")
return None
async def buscar_diarios(
query: Optional[str] = None,
data_inicio: Optional[str] = None,
data_fim: Optional[str] = None,
tipo_diario: Optional[str] = None,
page: int = 1,
page_size: int = 10,
) -> Dict[str, Any]:
"""
Realiza busca nos diários oficiais com os parâmetros fornecidos.
Args:
query: Termo de busca
data_inicio: Data inicial no formato YYYY-MM-DD
data_fim: Data final no formato YYYY-MM-DD
tipo_diario: Tipo de diário a ser filtrado
page: Número da página de resultados
page_size: Quantidade de resultados por página
Returns:
Dict[str, Any]: Dicionário com resultados da busca
"""
try:
es = AsyncElasticsearch(
"http://elasticsearch:9200",
request_timeout=30,
basic_auth=(
settings.ELASTICSEARCH_USER,
settings.ELASTICSEARCH_PASSWORD,
),
)
if not await es.ping():
raise ConnectionError("Não foi possível conectar ao Elasticsearch")
except Exception as e:
print(f"Erro ao conectar com Elasticsearch: {e}")
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
es_body = {
"query": {"bool": {"must": [], "filter": []}},
"size": page_size,
"from": (page - 1) * page_size,
"_source": [
"numero",
"data",
"link",
"tipo",
"paginas.numero",
"paginas.conteudo",
],
}
parsed_dt_inicio = await parse_date(data_inicio)
parsed_dt_fim = await parse_date(data_fim)
if parsed_dt_inicio or parsed_dt_fim:
date_range_filter = {}
if parsed_dt_inicio:
date_range_filter["gte"] = parsed_dt_inicio
if parsed_dt_fim:
date_range_filter["lte"] = parsed_dt_fim
es_body["query"]["bool"]["filter"].append(
{"range": {"data": date_range_filter}}
)
if tipo_diario:
es_body["query"]["bool"]["filter"].append({"term": {"tipo.nome": tipo_diario}})
if query:
aplicar_fuzziness = await is_fuzzy_appropriate(query)
text_query_bool = {
"bool": {
"should": [
{
"match_phrase": {
"paginas.conteudo.search": {
"query": query,
"slop": 4,
"boost": 5.0,
}
}
},
{
"match": {
"paginas.conteudo.search": {
"query": query,
"fuzziness": "AUTO",
"operator": "and",
"prefix_length": 3,
}
}
},
],
"minimum_should_match": "75%",
}
}
nested_query = {
"nested": {
"path": "paginas",
"query": text_query_bool,
"inner_hits": {
"highlight": {
"fields": {"paginas.conteudo.search": {}},
"fragment_size": 500,
"number_of_fragments": 1,
"pre_tags": ["<mark>"],
"post_tags": ["</mark>"],
},
"_source": ["paginas.numero"],
},
}
}
es_body["query"]["bool"]["must"].append(nested_query)
else:
es_body["query"]["bool"]["must"].append({"match_all": {}})
# Adicionar min_score
if query:
es_body["min_score"] = 2.5 * len(query.split())
try:
response = await es.search(index="diario_oficial", body=es_body)
except Exception as e:
print(f"Erro ao executar busca no Elasticsearch: {e}")
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
finally:
await es.close()
hits = response.get("hits", {})
total = hits.get("total", {}).get("value", 0)
resultados_formatados = []
for hit in hits.get("hits", []):
source = hit.get("_source", {})
resultado_data = {
"id": hit.get("_id"),
"numero": source.get("numero", ""),
"data": source.get("data"),
"link": source.get("link", ""),
"tipo": source.get("tipo", {}).get("nome", "Sem Tipo"),
"score": hit.get("_score"),
"paginas": [],
}
pagina_match = None
if query and "inner_hits" in hit and "paginas" in hit["inner_hits"]:
paginas_inner_hits = hit["inner_hits"]["paginas"]["hits"]["hits"]
if paginas_inner_hits:
inner_hit = paginas_inner_hits[0]
inner_source = inner_hit.get("_source", {})
page_num = inner_source.get("numero", "N/A")
highlights = inner_hit.get("highlight", {}).get(
"paginas.conteudo.search", []
)
highlight_content = " ... ".join(highlights) if highlights else ""
if page_num != "N/A" and highlight_content:
pagina_match = PaginaSchema(
numero=page_num, conteudo=highlight_content
)
if pagina_match:
resultado_data["paginas"].append(pagina_match)
elif not query and "paginas" in source and source["paginas"]:
primeira_pagina_source = source["paginas"][0]
conteudo_orig = primeira_pagina_source.get("conteudo", "")
resultado_data["paginas"].append(
PaginaSchema(
numero=primeira_pagina_source.get("numero", 0),
conteudo=conteudo_orig,
)
)
resultados_formatados.append(ResultadoSchema(**resultado_data))
return {
"total": total,
"resultados": resultados_formatados,
"pagina": page,
"por_pagina": page_size,
}
async def remover_acentos(texto: str) -> str:
"""
Remove acentos de uma string para comparações mais neutras.
Args:
texto: Texto a ser normalizado
Returns:
str: Texto sem acentos
"""
return "".join(
c
for c in unicodedata.normalize("NFD", texto)
if unicodedata.category(c) != "Mn"
)
async def processar_query(query: str) -> str:
"""
Faz pré-processamento da query para separar termos como 'processonº404'.
Args:
query: Texto da consulta original
Returns:
str: Consulta processada
"""
return re.sub(r"([a-zA-Z]+)(nº|n°|no)(\d+)", r"\1 \2 \3", query)
async def montar_suggest_body(query_processada: str) -> dict:
"""
Monta o corpo da sugestão para a requisição ao Elasticsearch.
Args:
query_processada: Query processada para sugestão
Returns:
dict: Corpo da requisição para suggest
"""
return {
"suggest": {
"text": query_processada,
"correcao": {
"phrase": {
"field": "paginas.conteudo.search",
"size": 3,
"gram_size": 2,
"confidence": 0.8,
"max_errors": 4,
"highlight": {"pre_tag": "**", "post_tag": "**"},
"collate": {
"query": {
"source": {
"nested": {
"path": "paginas",
"query": {
"bool": {
"should": [
{
"match_phrase": {
"paginas.conteudo.search": {
"query": "{{suggestion}}",
"slop": 1,
}
}
}
]
}
},
}
}
},
"params": {"field_name": "paginas.conteudo.search"},
"prune": True,
},
}
},
},
"size": 0,
}
async def sugestao_termo(query: str) -> Optional[str]:
"""
Oferece sugestões de correção para a query, verificando se a sugestão
realmente retorna resultados antes de apresentá-la ao usuário.
Args:
query: Texto da consulta original
Returns:
Optional[str]: Sugestão para a consulta ou None
"""
es = await conectar_elasticsearch()
if not es:
return None
query_processada = await processar_query(query)
suggest_body = await montar_suggest_body(query_processada)
try:
response = await es.search(index="diario_oficial", body=suggest_body)
suggestions = response.get("suggest", {}).get("correcao", [])
for sug in suggestions:
for option in sug.get("options", []):
if option.get("collate_match", False):
sugestao = option["text"]
# Verifica se a sugestão é idêntica à query (ignorando acentos e case)
if sugestao.lower() == query.lower():
continue
if await remover_acentos(sugestao.lower()) == await remover_acentos(
query.lower()
):
continue
return sugestao
return None
except Exception as e:
print(f"Erro ao buscar sugestão: {e}")
return None
finally:
await es.close()
async def conectar_elasticsearch() -> Optional[AsyncElasticsearch]:
"""
Conecta ao Elasticsearch e retorna o cliente.
Returns:
Optional[AsyncElasticsearch]: Cliente Elasticsearch ou None se falhar
"""
try:
es = AsyncElasticsearch(
"http://elasticsearch:9200",
request_timeout=30,
basic_auth=(
settings.ELASTICSEARCH_USER,
settings.ELASTICSEARCH_PASSWORD,
),
)
if not await es.ping():
raise ConnectionError("Não foi possível conectar ao Elasticsearch")
return es
except Exception as e:
print(f"Erro ao conectar com Elasticsearch: {e}")
return None
async def construir_ordenacao(ordenar_por: str) -> List[Dict[str, Any]]:
"""
Constrói a cláusula de ordenação para a consulta.
Args:
ordenar_por: Critério de ordenação
Returns:
List[Dict[str, Any]]: Lista de ordenação para Elasticsearch
"""
if ordenar_por == "data_asc":
return [{"data": {"order": "asc"}}]
elif ordenar_por == "data_desc":
return [{"data": {"order": "desc"}}]
else: # relevancia
return ["_score"]
async def preencher_numero_do_diario_com_zeros(numero_diario: str) -> str:
"""
Preenche o número do diário com zeros à esquerda.
Args:
numero_diario: Número do diário
Returns:
str: Número formatado com zeros à esquerda
"""
return numero_diario.zfill(4)
async def construir_filtros(
data_inicio: Optional[str],
data_fim: Optional[str],
tipo_diario: Optional[str],
numero_diario: Optional[str],
) -> List[Dict[str, Any]]:
"""
Constrói os filtros de data, tipo e número do diário.
Args:
data_inicio: Data inicial no formato YYYY-MM-DD
data_fim: Data final no formato YYYY-MM-DD
tipo_diario: Tipo de diário a ser filtrado
numero_diario: Número do diário
Returns:
List[Dict[str, Any]]: Lista de filtros para Elasticsearch
"""
filtros = []
parsed_dt_inicio = await parse_date(data_inicio)
parsed_dt_fim = await parse_date(data_fim)
if parsed_dt_inicio or parsed_dt_fim:
date_range = {}
if parsed_dt_inicio:
date_range["gte"] = parsed_dt_inicio
if parsed_dt_fim:
date_range["lte"] = parsed_dt_fim
filtros.append({"range": {"data": date_range}})
if tipo_diario:
filtros.append({"term": {"tipo.nome": tipo_diario}})
if numero_diario:
# numero_diario = await preencher_numero_do_diario_com_zeros(numero_diario)
filtros.append({"wildcard": {"numero": f"*{numero_diario}*"}})
return filtros
async def construir_query_busca(
query: Optional[str], modo_busca: str
) -> Dict[str, Any]:
"""
Constrói a query de busca com base no termo e modo de busca.
Args:
query: Termo de busca
modo_busca: Modo de busca (exata ou qualquer)
Returns:
Dict[str, Any]: Query de busca para Elasticsearch
"""
if not query:
return {
"nested": {
"path": "paginas",
"query": {"match_all": {}},
"inner_hits": {
"_source": ["paginas.numero", "paginas.conteudo"],
"size": 100,
},
}
}
should_queries = []
# Busca exata (boost maior)
should_queries.append(
{
"match_phrase": {
"paginas.conteudo.search": {"query": query, "slop": 3, "boost": 5.0}
}
}
)
# Busca mais leve (separando termos), se modo_busca permitir
if modo_busca == "qualquer":
should_queries.append(
{
"match": {
"paginas.conteudo.search": {
"query": query,
"operator": "or",
"fuzziness": "AUTO",
"prefix_length": 3,
"boost": 1.0,
}
}
}
)
return {
"nested": {
"path": "paginas",
"query": {"bool": {"should": should_queries, "minimum_should_match": 1}},
"inner_hits": {
"highlight": {
"fields": {"paginas.conteudo.search": {}},
"fragment_size": 500,
"number_of_fragments": 1,
"pre_tags": ["<mark>"],
"post_tags": ["</mark>"],
},
"_source": ["paginas.numero"],
"size": 100, # Aumentar para retornar mais páginas correspondentes
},
}
}
async def construir_request_body(
query: Optional[str],
modo_busca: str,
ordenar_por: str,
data_inicio: Optional[str],
data_fim: Optional[str],
tipo_diario: Optional[str],
numero_diario: Optional[str],
page: int,
page_size: int,
) -> Dict[str, Any]:
"""
Constrói o corpo da requisição para o Elasticsearch.
Args:
query: Termo de busca
modo_busca: Modo de busca (exata ou qualquer)
ordenar_por: Critério de ordenação
data_inicio: Data inicial
data_fim: Data final
tipo_diario: Tipo de diário
numero_diario: Número do diário
page: Número da página
page_size: Tamanho da página
Returns:
Dict[str, Any]: Corpo da requisição para Elasticsearch
"""
es_body = {
"query": {"bool": {"must": [], "filter": []}},
"size": page_size,
"from": (page - 1) * page_size,
"_source": [
"numero",
"data",
"link",
"tipo",
"paginas.numero",
"paginas.conteudo",
],
}
# Adicionar ordenação
es_body["sort"] = await construir_ordenacao(ordenar_por)
# Adicionar filtros
es_body["query"]["bool"]["filter"] = await construir_filtros(
data_inicio, data_fim, tipo_diario, numero_diario
)
# Adicionar query principal
query_principal = await construir_query_busca(query, modo_busca)
es_body["query"]["bool"]["must"].append(query_principal)
return es_body
async def processar_paginas_encontradas(
hit: Dict[str, Any], query: Optional[str]
) -> List[PaginaSchema]:
"""
Processa as páginas encontradas em um hit, retornando todas as correspondências.
Args:
hit: Item de resultado do Elasticsearch
query: Termo de busca original
Returns:
List[PaginaSchema]: Lista de páginas encontradas
"""
if not query:
return []
paginas = []
source = hit.get("_source", {})
#if not query and "paginas" in source:
# for pagina in source["paginas"]:
# paginas.append(
# PaginaSchema(
# numero=pagina.get("numero", 0), conteudo=pagina.get("conteudo", "")
# )
# )
# Se temos uma query e há inner_hits, processamos todas as páginas correspondentes
if query and "inner_hits" in hit and "paginas" in hit["inner_hits"]:
paginas_inner_hits = hit["inner_hits"]["paginas"]["hits"]["hits"]
# Processar todas as páginas encontradas, não apenas a primeira
for inner_hit in paginas_inner_hits:
inner_source = inner_hit.get("_source", {})
page_num = inner_source.get("numero", "N/A")
highlights = inner_hit.get("highlight", {}).get(
"paginas.conteudo.search", []
)
highlight_content = " ... ".join(highlights) if highlights else ""
if page_num != "N/A" and highlight_content:
paginas.append(
PaginaSchema(numero=page_num, conteudo=highlight_content)
)
# Se não há query ou não encontramos páginas nos inner_hits, usamos a primeira página do documento
if not paginas and "paginas" in source and source["paginas"]:
primeira_pagina_source = source["paginas"][0]
conteudo_orig = primeira_pagina_source.get("conteudo", "")
paginas.append(
PaginaSchema(
numero=primeira_pagina_source.get("numero", 0),
conteudo=conteudo_orig,
)
)
return paginas
async def processar_resultados(
response: Dict[str, Any], query: Optional[str], page: int, page_size: int
) -> Dict[str, Any]:
"""
Processa os resultados da busca no Elasticsearch.
Args:
response: Resposta do Elasticsearch
query: Termo de busca original
page: Número da página atual
page_size: Tamanho da página
Returns:
Dict[str, Any]: Resultados processados
"""
hits = response.get("hits", {})
total = hits.get("total", {}).get("value", 0)
resultados_formatados = []
for hit in hits.get("hits", []):
source = hit.get("_source", {})
resultado_data = {
"id": hit.get("_id"),
"numero": source.get("numero", ""),
"data": source.get("data"),
"link": source.get("link", ""),
"tipo": source.get("tipo", {}).get("nome", "Sem Tipo"),
"score": hit.get("_score"),
"paginas": [],
}
# Processar todas as páginas encontradas
resultado_data["paginas"] = await processar_paginas_encontradas(hit, query)
resultados_formatados.append(ResultadoSchema(**resultado_data))
return {
"total": total,
"resultados": resultados_formatados,
"pagina": page,
"por_pagina": page_size,
}
async def buscar_diarios_simples(
query: Optional[str] = None,
numero_diario: Optional[str] = None,
modo_busca: str = "exata", # "exata" ou "qualquer"
ordenar_por: str = "data_asc", # "relevancia" ou "data"
data_inicio: Optional[str] = None,
data_fim: Optional[str] = None,
tipo_diario: Optional[str] = None,
page: int = 1,
page_size: int = 10,
) -> Dict[str, Any]:
"""
Função principal para buscar diários oficiais.
Args:
query: Termo de busca
numero_diario: Número do diário oficial
modo_busca: Modo de busca (exata ou qualquer)
ordenar_por: Critério de ordenação
data_inicio: Data inicial
data_fim: Data final
tipo_diario: Tipo de diário
page: Número da página
page_size: Tamanho da página
Returns:
Dict[str, Any]: Dicionário com os resultados da busca
"""
# Conectar ao Elasticsearch
es = await conectar_elasticsearch()
if not es:
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
# Construir o corpo da requisição
es_body = await construir_request_body(
query,
modo_busca,
ordenar_por,
data_inicio,
data_fim,
tipo_diario,
numero_diario,
page,
page_size,
)
# Executar a busca
try:
response = await es.search(index="diario_oficial", body=es_body)
except Exception as e:
print(f"Erro ao executar busca no Elasticsearch: {e}")
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
finally:
await es.close()
# Processar e retornar os resultados
return await processar_resultados(response, query, page, page_size)
async def list_diarios(
es_client: AsyncElasticsearch,
page: int = 1,
page_size: int = 10,
data_inicio: Optional[str] = None,
data_fim: Optional[str] = None,
tipo_diario: Optional[str] = None,
numero_diario: Optional[str] = None,
ordenar_por: str = "data_desc"
) -> Dict[str, any]:
"""
Lista diários com paginação e filtros opcionais
Args:
es_client: Cliente do Elasticsearch
page: Número da página
page_size: Itens por página
data_inicio: Data inicial (YYYY-MM-DD)
data_fim: Data final (YYYY-MM-DD)
tipo_diario: Filtro por tipo de diário
numero_diario: Filtro por número do diário
ordenar_por: Campo para ordenação (data_desc, data_asc)
Returns:
Dict com total de itens e lista de diários
"""
# Construir query de filtros
filters = []
# Filtro por data
if data_inicio or data_fim:
date_range = {}
if data_inicio:
try:
datetime.strptime(data_inicio, "%Y-%m-%d")
date_range["gte"] = data_inicio
except ValueError:
pass
if data_fim:
try:
datetime.strptime(data_fim, "%Y-%m-%d")
date_range["lte"] = data_fim
except ValueError:
pass
if date_range:
filters.append({"range": {"data": date_range}})
# Filtro por tipo
if tipo_diario:
filters.append({"term": {"tipo.nome": tipo_diario}})
# Filtro por número
if numero_diario:
filters.append({"wildcard": {"numero": f"*{numero_diario}*"}})
# Construir query completa
query = {
"bool": {
"must": [{"match_all": {}}],
"filter": filters
}
}
# Definir ordenação
sort = [{"data": {"order": "asc" if ordenar_por == "data_asc" else "desc"}}]
# Executar consulta
try:
response = await es_client.search(
index="diario_oficial",
body={
"query": query,
"sort": sort,
"from": (page - 1) * page_size,
"size": page_size,
"_source": ["numero", "data", "tipo", "link"]
}
)
hits = response["hits"]["hits"]
total = response["hits"]["total"]["value"]
# Processar resultados
diarios = []
for hit in hits:
source = hit["_source"]
diarios.append(BuscaDiariosResponseSchema(
id=hit["_id"],
numero=source.get("numero"),
data=source.get("data"),
tipo=source.get("tipo", {}).get("nome"),
link=source.get("link")
))
return {
"total": total,
"page": page,
"page_size": page_size,
"results": diarios
}
except Exception as e:
print(f"Erro ao listar diários: {e}")
return {
"total": 0,
"page": page,
"page_size": page_size,
"results": []
}