adiciona o aplicativo de diarios
This commit is contained in:
848
diarios/search_service.py
Normal file
848
diarios/search_service.py
Normal file
@ -0,0 +1,848 @@
|
||||
import re
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, Any, List
|
||||
from elasticsearch import Elasticsearch, AsyncElasticsearch
|
||||
from .schemas import BuscaDiariosResponseSchema, ResultadoSchema, PaginaSchema
|
||||
import unicodedata
|
||||
import asyncio
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
async def is_fuzzy_appropriate(term: str) -> bool:
|
||||
"""
|
||||
Determina se a fuzziness é apropriada para o termo de busca.
|
||||
|
||||
Args:
|
||||
term: Termo de busca a ser avaliado
|
||||
|
||||
Returns:
|
||||
bool: True se fuzziness é apropriada, False caso contrário
|
||||
"""
|
||||
return not re.match(r"^\d+/\d+$", term.strip())
|
||||
|
||||
|
||||
async def parse_date(date_str: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
Converte string de data para formato ISO para ElasticSearch.
|
||||
|
||||
Args:
|
||||
date_str: String de data no formato YYYY-MM-DD
|
||||
|
||||
Returns:
|
||||
Optional[str]: Data formatada ou None se inválida
|
||||
"""
|
||||
if not date_str:
|
||||
return None
|
||||
try:
|
||||
dt = datetime.strptime(date_str, "%Y-%m-%d")
|
||||
return dt.strftime("%Y-%m-%d")
|
||||
except ValueError:
|
||||
print(f"Alerta: Formato de data inválido recebido: {date_str}")
|
||||
return None
|
||||
|
||||
|
||||
async def buscar_diarios(
|
||||
query: Optional[str] = None,
|
||||
data_inicio: Optional[str] = None,
|
||||
data_fim: Optional[str] = None,
|
||||
tipo_diario: Optional[str] = None,
|
||||
page: int = 1,
|
||||
page_size: int = 10,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Realiza busca nos diários oficiais com os parâmetros fornecidos.
|
||||
|
||||
Args:
|
||||
query: Termo de busca
|
||||
data_inicio: Data inicial no formato YYYY-MM-DD
|
||||
data_fim: Data final no formato YYYY-MM-DD
|
||||
tipo_diario: Tipo de diário a ser filtrado
|
||||
page: Número da página de resultados
|
||||
page_size: Quantidade de resultados por página
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Dicionário com resultados da busca
|
||||
"""
|
||||
try:
|
||||
es = AsyncElasticsearch(
|
||||
"http://elasticsearch:9200",
|
||||
request_timeout=30,
|
||||
basic_auth=(
|
||||
settings.ELASTICSEARCH_USER,
|
||||
settings.ELASTICSEARCH_PASSWORD,
|
||||
),
|
||||
)
|
||||
if not await es.ping():
|
||||
raise ConnectionError("Não foi possível conectar ao Elasticsearch")
|
||||
except Exception as e:
|
||||
print(f"Erro ao conectar com Elasticsearch: {e}")
|
||||
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
|
||||
|
||||
es_body = {
|
||||
"query": {"bool": {"must": [], "filter": []}},
|
||||
"size": page_size,
|
||||
"from": (page - 1) * page_size,
|
||||
"_source": [
|
||||
"numero",
|
||||
"data",
|
||||
"link",
|
||||
"tipo",
|
||||
"paginas.numero",
|
||||
"paginas.conteudo",
|
||||
],
|
||||
}
|
||||
|
||||
parsed_dt_inicio = await parse_date(data_inicio)
|
||||
parsed_dt_fim = await parse_date(data_fim)
|
||||
if parsed_dt_inicio or parsed_dt_fim:
|
||||
date_range_filter = {}
|
||||
if parsed_dt_inicio:
|
||||
date_range_filter["gte"] = parsed_dt_inicio
|
||||
if parsed_dt_fim:
|
||||
date_range_filter["lte"] = parsed_dt_fim
|
||||
es_body["query"]["bool"]["filter"].append(
|
||||
{"range": {"data": date_range_filter}}
|
||||
)
|
||||
|
||||
if tipo_diario:
|
||||
es_body["query"]["bool"]["filter"].append({"term": {"tipo.nome": tipo_diario}})
|
||||
|
||||
if query:
|
||||
aplicar_fuzziness = await is_fuzzy_appropriate(query)
|
||||
text_query_bool = {
|
||||
"bool": {
|
||||
"should": [
|
||||
{
|
||||
"match_phrase": {
|
||||
"paginas.conteudo.search": {
|
||||
"query": query,
|
||||
"slop": 4,
|
||||
"boost": 5.0,
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"match": {
|
||||
"paginas.conteudo.search": {
|
||||
"query": query,
|
||||
"fuzziness": "AUTO",
|
||||
"operator": "and",
|
||||
"prefix_length": 3,
|
||||
}
|
||||
}
|
||||
},
|
||||
],
|
||||
"minimum_should_match": "75%",
|
||||
}
|
||||
}
|
||||
|
||||
nested_query = {
|
||||
"nested": {
|
||||
"path": "paginas",
|
||||
"query": text_query_bool,
|
||||
"inner_hits": {
|
||||
"highlight": {
|
||||
"fields": {"paginas.conteudo.search": {}},
|
||||
"fragment_size": 500,
|
||||
"number_of_fragments": 1,
|
||||
"pre_tags": ["<mark>"],
|
||||
"post_tags": ["</mark>"],
|
||||
},
|
||||
"_source": ["paginas.numero"],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
es_body["query"]["bool"]["must"].append(nested_query)
|
||||
else:
|
||||
es_body["query"]["bool"]["must"].append({"match_all": {}})
|
||||
|
||||
# Adicionar min_score
|
||||
if query:
|
||||
es_body["min_score"] = 2.5 * len(query.split())
|
||||
try:
|
||||
response = await es.search(index="diario_oficial", body=es_body)
|
||||
except Exception as e:
|
||||
print(f"Erro ao executar busca no Elasticsearch: {e}")
|
||||
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
|
||||
finally:
|
||||
await es.close()
|
||||
|
||||
hits = response.get("hits", {})
|
||||
total = hits.get("total", {}).get("value", 0)
|
||||
resultados_formatados = []
|
||||
|
||||
for hit in hits.get("hits", []):
|
||||
source = hit.get("_source", {})
|
||||
resultado_data = {
|
||||
"id": hit.get("_id"),
|
||||
"numero": source.get("numero", ""),
|
||||
"data": source.get("data"),
|
||||
"link": source.get("link", ""),
|
||||
"tipo": source.get("tipo", {}).get("nome", "Sem Tipo"),
|
||||
"score": hit.get("_score"),
|
||||
"paginas": [],
|
||||
}
|
||||
|
||||
pagina_match = None
|
||||
if query and "inner_hits" in hit and "paginas" in hit["inner_hits"]:
|
||||
paginas_inner_hits = hit["inner_hits"]["paginas"]["hits"]["hits"]
|
||||
if paginas_inner_hits:
|
||||
inner_hit = paginas_inner_hits[0]
|
||||
inner_source = inner_hit.get("_source", {})
|
||||
page_num = inner_source.get("numero", "N/A")
|
||||
highlights = inner_hit.get("highlight", {}).get(
|
||||
"paginas.conteudo.search", []
|
||||
)
|
||||
highlight_content = " ... ".join(highlights) if highlights else ""
|
||||
|
||||
if page_num != "N/A" and highlight_content:
|
||||
pagina_match = PaginaSchema(
|
||||
numero=page_num, conteudo=highlight_content
|
||||
)
|
||||
|
||||
if pagina_match:
|
||||
resultado_data["paginas"].append(pagina_match)
|
||||
elif not query and "paginas" in source and source["paginas"]:
|
||||
primeira_pagina_source = source["paginas"][0]
|
||||
conteudo_orig = primeira_pagina_source.get("conteudo", "")
|
||||
resultado_data["paginas"].append(
|
||||
PaginaSchema(
|
||||
numero=primeira_pagina_source.get("numero", 0),
|
||||
conteudo=conteudo_orig,
|
||||
)
|
||||
)
|
||||
|
||||
resultados_formatados.append(ResultadoSchema(**resultado_data))
|
||||
|
||||
return {
|
||||
"total": total,
|
||||
"resultados": resultados_formatados,
|
||||
"pagina": page,
|
||||
"por_pagina": page_size,
|
||||
}
|
||||
|
||||
|
||||
async def remover_acentos(texto: str) -> str:
|
||||
"""
|
||||
Remove acentos de uma string para comparações mais neutras.
|
||||
|
||||
Args:
|
||||
texto: Texto a ser normalizado
|
||||
|
||||
Returns:
|
||||
str: Texto sem acentos
|
||||
"""
|
||||
return "".join(
|
||||
c
|
||||
for c in unicodedata.normalize("NFD", texto)
|
||||
if unicodedata.category(c) != "Mn"
|
||||
)
|
||||
|
||||
|
||||
async def processar_query(query: str) -> str:
|
||||
"""
|
||||
Faz pré-processamento da query para separar termos como 'processonº404'.
|
||||
|
||||
Args:
|
||||
query: Texto da consulta original
|
||||
|
||||
Returns:
|
||||
str: Consulta processada
|
||||
"""
|
||||
return re.sub(r"([a-zA-Z]+)(nº|n°|no)(\d+)", r"\1 \2 \3", query)
|
||||
|
||||
|
||||
async def montar_suggest_body(query_processada: str) -> dict:
|
||||
"""
|
||||
Monta o corpo da sugestão para a requisição ao Elasticsearch.
|
||||
|
||||
Args:
|
||||
query_processada: Query processada para sugestão
|
||||
|
||||
Returns:
|
||||
dict: Corpo da requisição para suggest
|
||||
"""
|
||||
return {
|
||||
"suggest": {
|
||||
"text": query_processada,
|
||||
"correcao": {
|
||||
"phrase": {
|
||||
"field": "paginas.conteudo.search",
|
||||
"size": 3,
|
||||
"gram_size": 2,
|
||||
"confidence": 0.8,
|
||||
"max_errors": 4,
|
||||
"highlight": {"pre_tag": "**", "post_tag": "**"},
|
||||
"collate": {
|
||||
"query": {
|
||||
"source": {
|
||||
"nested": {
|
||||
"path": "paginas",
|
||||
"query": {
|
||||
"bool": {
|
||||
"should": [
|
||||
{
|
||||
"match_phrase": {
|
||||
"paginas.conteudo.search": {
|
||||
"query": "{{suggestion}}",
|
||||
"slop": 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
"params": {"field_name": "paginas.conteudo.search"},
|
||||
"prune": True,
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
"size": 0,
|
||||
}
|
||||
|
||||
|
||||
async def sugestao_termo(query: str) -> Optional[str]:
|
||||
"""
|
||||
Oferece sugestões de correção para a query, verificando se a sugestão
|
||||
realmente retorna resultados antes de apresentá-la ao usuário.
|
||||
|
||||
Args:
|
||||
query: Texto da consulta original
|
||||
|
||||
Returns:
|
||||
Optional[str]: Sugestão para a consulta ou None
|
||||
"""
|
||||
es = await conectar_elasticsearch()
|
||||
if not es:
|
||||
return None
|
||||
|
||||
query_processada = await processar_query(query)
|
||||
suggest_body = await montar_suggest_body(query_processada)
|
||||
|
||||
try:
|
||||
response = await es.search(index="diario_oficial", body=suggest_body)
|
||||
suggestions = response.get("suggest", {}).get("correcao", [])
|
||||
|
||||
for sug in suggestions:
|
||||
for option in sug.get("options", []):
|
||||
if option.get("collate_match", False):
|
||||
sugestao = option["text"]
|
||||
|
||||
# Verifica se a sugestão é idêntica à query (ignorando acentos e case)
|
||||
if sugestao.lower() == query.lower():
|
||||
continue
|
||||
if await remover_acentos(sugestao.lower()) == await remover_acentos(
|
||||
query.lower()
|
||||
):
|
||||
continue
|
||||
|
||||
return sugestao
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"Erro ao buscar sugestão: {e}")
|
||||
return None
|
||||
finally:
|
||||
await es.close()
|
||||
|
||||
|
||||
async def conectar_elasticsearch() -> Optional[AsyncElasticsearch]:
|
||||
"""
|
||||
Conecta ao Elasticsearch e retorna o cliente.
|
||||
|
||||
Returns:
|
||||
Optional[AsyncElasticsearch]: Cliente Elasticsearch ou None se falhar
|
||||
"""
|
||||
try:
|
||||
es = AsyncElasticsearch(
|
||||
"http://elasticsearch:9200",
|
||||
request_timeout=30,
|
||||
basic_auth=(
|
||||
settings.ELASTICSEARCH_USER,
|
||||
settings.ELASTICSEARCH_PASSWORD,
|
||||
),
|
||||
)
|
||||
if not await es.ping():
|
||||
raise ConnectionError("Não foi possível conectar ao Elasticsearch")
|
||||
return es
|
||||
except Exception as e:
|
||||
print(f"Erro ao conectar com Elasticsearch: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def construir_ordenacao(ordenar_por: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Constrói a cláusula de ordenação para a consulta.
|
||||
|
||||
Args:
|
||||
ordenar_por: Critério de ordenação
|
||||
|
||||
Returns:
|
||||
List[Dict[str, Any]]: Lista de ordenação para Elasticsearch
|
||||
"""
|
||||
if ordenar_por == "data_asc":
|
||||
return [{"data": {"order": "asc"}}]
|
||||
elif ordenar_por == "data_desc":
|
||||
return [{"data": {"order": "desc"}}]
|
||||
else: # relevancia
|
||||
return ["_score"]
|
||||
|
||||
|
||||
async def preencher_numero_do_diario_com_zeros(numero_diario: str) -> str:
|
||||
"""
|
||||
Preenche o número do diário com zeros à esquerda.
|
||||
|
||||
Args:
|
||||
numero_diario: Número do diário
|
||||
|
||||
Returns:
|
||||
str: Número formatado com zeros à esquerda
|
||||
"""
|
||||
return numero_diario.zfill(4)
|
||||
|
||||
|
||||
async def construir_filtros(
|
||||
data_inicio: Optional[str],
|
||||
data_fim: Optional[str],
|
||||
tipo_diario: Optional[str],
|
||||
numero_diario: Optional[str],
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Constrói os filtros de data, tipo e número do diário.
|
||||
|
||||
Args:
|
||||
data_inicio: Data inicial no formato YYYY-MM-DD
|
||||
data_fim: Data final no formato YYYY-MM-DD
|
||||
tipo_diario: Tipo de diário a ser filtrado
|
||||
numero_diario: Número do diário
|
||||
|
||||
Returns:
|
||||
List[Dict[str, Any]]: Lista de filtros para Elasticsearch
|
||||
"""
|
||||
filtros = []
|
||||
|
||||
parsed_dt_inicio = await parse_date(data_inicio)
|
||||
parsed_dt_fim = await parse_date(data_fim)
|
||||
if parsed_dt_inicio or parsed_dt_fim:
|
||||
date_range = {}
|
||||
if parsed_dt_inicio:
|
||||
date_range["gte"] = parsed_dt_inicio
|
||||
if parsed_dt_fim:
|
||||
date_range["lte"] = parsed_dt_fim
|
||||
filtros.append({"range": {"data": date_range}})
|
||||
|
||||
if tipo_diario:
|
||||
filtros.append({"term": {"tipo.nome": tipo_diario}})
|
||||
|
||||
if numero_diario:
|
||||
# numero_diario = await preencher_numero_do_diario_com_zeros(numero_diario)
|
||||
filtros.append({"wildcard": {"numero": f"*{numero_diario}*"}})
|
||||
|
||||
return filtros
|
||||
|
||||
|
||||
async def construir_query_busca(
|
||||
query: Optional[str], modo_busca: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Constrói a query de busca com base no termo e modo de busca.
|
||||
|
||||
Args:
|
||||
query: Termo de busca
|
||||
modo_busca: Modo de busca (exata ou qualquer)
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Query de busca para Elasticsearch
|
||||
"""
|
||||
if not query:
|
||||
return {
|
||||
"nested": {
|
||||
"path": "paginas",
|
||||
"query": {"match_all": {}},
|
||||
"inner_hits": {
|
||||
"_source": ["paginas.numero", "paginas.conteudo"],
|
||||
"size": 100,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
should_queries = []
|
||||
|
||||
# Busca exata (boost maior)
|
||||
should_queries.append(
|
||||
{
|
||||
"match_phrase": {
|
||||
"paginas.conteudo.search": {"query": query, "slop": 3, "boost": 5.0}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# Busca mais leve (separando termos), se modo_busca permitir
|
||||
if modo_busca == "qualquer":
|
||||
should_queries.append(
|
||||
{
|
||||
"match": {
|
||||
"paginas.conteudo.search": {
|
||||
"query": query,
|
||||
"operator": "or",
|
||||
"fuzziness": "AUTO",
|
||||
"prefix_length": 3,
|
||||
"boost": 1.0,
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
"nested": {
|
||||
"path": "paginas",
|
||||
"query": {"bool": {"should": should_queries, "minimum_should_match": 1}},
|
||||
"inner_hits": {
|
||||
"highlight": {
|
||||
"fields": {"paginas.conteudo.search": {}},
|
||||
"fragment_size": 500,
|
||||
"number_of_fragments": 1,
|
||||
"pre_tags": ["<mark>"],
|
||||
"post_tags": ["</mark>"],
|
||||
},
|
||||
"_source": ["paginas.numero"],
|
||||
"size": 100, # Aumentar para retornar mais páginas correspondentes
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async def construir_request_body(
|
||||
query: Optional[str],
|
||||
modo_busca: str,
|
||||
ordenar_por: str,
|
||||
data_inicio: Optional[str],
|
||||
data_fim: Optional[str],
|
||||
tipo_diario: Optional[str],
|
||||
numero_diario: Optional[str],
|
||||
page: int,
|
||||
page_size: int,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Constrói o corpo da requisição para o Elasticsearch.
|
||||
|
||||
Args:
|
||||
query: Termo de busca
|
||||
modo_busca: Modo de busca (exata ou qualquer)
|
||||
ordenar_por: Critério de ordenação
|
||||
data_inicio: Data inicial
|
||||
data_fim: Data final
|
||||
tipo_diario: Tipo de diário
|
||||
numero_diario: Número do diário
|
||||
page: Número da página
|
||||
page_size: Tamanho da página
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Corpo da requisição para Elasticsearch
|
||||
"""
|
||||
es_body = {
|
||||
"query": {"bool": {"must": [], "filter": []}},
|
||||
"size": page_size,
|
||||
"from": (page - 1) * page_size,
|
||||
"_source": [
|
||||
"numero",
|
||||
"data",
|
||||
"link",
|
||||
"tipo",
|
||||
"paginas.numero",
|
||||
"paginas.conteudo",
|
||||
],
|
||||
}
|
||||
|
||||
# Adicionar ordenação
|
||||
es_body["sort"] = await construir_ordenacao(ordenar_por)
|
||||
|
||||
# Adicionar filtros
|
||||
es_body["query"]["bool"]["filter"] = await construir_filtros(
|
||||
data_inicio, data_fim, tipo_diario, numero_diario
|
||||
)
|
||||
|
||||
# Adicionar query principal
|
||||
query_principal = await construir_query_busca(query, modo_busca)
|
||||
es_body["query"]["bool"]["must"].append(query_principal)
|
||||
|
||||
return es_body
|
||||
|
||||
|
||||
async def processar_paginas_encontradas(
|
||||
hit: Dict[str, Any], query: Optional[str]
|
||||
) -> List[PaginaSchema]:
|
||||
"""
|
||||
Processa as páginas encontradas em um hit, retornando todas as correspondências.
|
||||
|
||||
Args:
|
||||
hit: Item de resultado do Elasticsearch
|
||||
query: Termo de busca original
|
||||
|
||||
Returns:
|
||||
List[PaginaSchema]: Lista de páginas encontradas
|
||||
"""
|
||||
if not query:
|
||||
return []
|
||||
|
||||
paginas = []
|
||||
source = hit.get("_source", {})
|
||||
|
||||
#if not query and "paginas" in source:
|
||||
# for pagina in source["paginas"]:
|
||||
# paginas.append(
|
||||
# PaginaSchema(
|
||||
# numero=pagina.get("numero", 0), conteudo=pagina.get("conteudo", "")
|
||||
# )
|
||||
# )
|
||||
# Se temos uma query e há inner_hits, processamos todas as páginas correspondentes
|
||||
if query and "inner_hits" in hit and "paginas" in hit["inner_hits"]:
|
||||
paginas_inner_hits = hit["inner_hits"]["paginas"]["hits"]["hits"]
|
||||
|
||||
# Processar todas as páginas encontradas, não apenas a primeira
|
||||
for inner_hit in paginas_inner_hits:
|
||||
inner_source = inner_hit.get("_source", {})
|
||||
page_num = inner_source.get("numero", "N/A")
|
||||
highlights = inner_hit.get("highlight", {}).get(
|
||||
"paginas.conteudo.search", []
|
||||
)
|
||||
highlight_content = " ... ".join(highlights) if highlights else ""
|
||||
|
||||
if page_num != "N/A" and highlight_content:
|
||||
paginas.append(
|
||||
PaginaSchema(numero=page_num, conteudo=highlight_content)
|
||||
)
|
||||
|
||||
# Se não há query ou não encontramos páginas nos inner_hits, usamos a primeira página do documento
|
||||
if not paginas and "paginas" in source and source["paginas"]:
|
||||
primeira_pagina_source = source["paginas"][0]
|
||||
conteudo_orig = primeira_pagina_source.get("conteudo", "")
|
||||
paginas.append(
|
||||
PaginaSchema(
|
||||
numero=primeira_pagina_source.get("numero", 0),
|
||||
conteudo=conteudo_orig,
|
||||
)
|
||||
)
|
||||
|
||||
return paginas
|
||||
|
||||
|
||||
async def processar_resultados(
|
||||
response: Dict[str, Any], query: Optional[str], page: int, page_size: int
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Processa os resultados da busca no Elasticsearch.
|
||||
|
||||
Args:
|
||||
response: Resposta do Elasticsearch
|
||||
query: Termo de busca original
|
||||
page: Número da página atual
|
||||
page_size: Tamanho da página
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Resultados processados
|
||||
"""
|
||||
hits = response.get("hits", {})
|
||||
total = hits.get("total", {}).get("value", 0)
|
||||
resultados_formatados = []
|
||||
|
||||
for hit in hits.get("hits", []):
|
||||
source = hit.get("_source", {})
|
||||
resultado_data = {
|
||||
"id": hit.get("_id"),
|
||||
"numero": source.get("numero", ""),
|
||||
"data": source.get("data"),
|
||||
"link": source.get("link", ""),
|
||||
"tipo": source.get("tipo", {}).get("nome", "Sem Tipo"),
|
||||
"score": hit.get("_score"),
|
||||
"paginas": [],
|
||||
}
|
||||
|
||||
# Processar todas as páginas encontradas
|
||||
resultado_data["paginas"] = await processar_paginas_encontradas(hit, query)
|
||||
|
||||
resultados_formatados.append(ResultadoSchema(**resultado_data))
|
||||
|
||||
return {
|
||||
"total": total,
|
||||
"resultados": resultados_formatados,
|
||||
"pagina": page,
|
||||
"por_pagina": page_size,
|
||||
}
|
||||
|
||||
|
||||
async def buscar_diarios_simples(
|
||||
query: Optional[str] = None,
|
||||
numero_diario: Optional[str] = None,
|
||||
modo_busca: str = "exata", # "exata" ou "qualquer"
|
||||
ordenar_por: str = "data_asc", # "relevancia" ou "data"
|
||||
data_inicio: Optional[str] = None,
|
||||
data_fim: Optional[str] = None,
|
||||
tipo_diario: Optional[str] = None,
|
||||
page: int = 1,
|
||||
page_size: int = 10,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Função principal para buscar diários oficiais.
|
||||
|
||||
Args:
|
||||
query: Termo de busca
|
||||
numero_diario: Número do diário oficial
|
||||
modo_busca: Modo de busca (exata ou qualquer)
|
||||
ordenar_por: Critério de ordenação
|
||||
data_inicio: Data inicial
|
||||
data_fim: Data final
|
||||
tipo_diario: Tipo de diário
|
||||
page: Número da página
|
||||
page_size: Tamanho da página
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Dicionário com os resultados da busca
|
||||
"""
|
||||
# Conectar ao Elasticsearch
|
||||
es = await conectar_elasticsearch()
|
||||
if not es:
|
||||
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
|
||||
|
||||
# Construir o corpo da requisição
|
||||
es_body = await construir_request_body(
|
||||
query,
|
||||
modo_busca,
|
||||
ordenar_por,
|
||||
data_inicio,
|
||||
data_fim,
|
||||
tipo_diario,
|
||||
numero_diario,
|
||||
page,
|
||||
page_size,
|
||||
)
|
||||
|
||||
# Executar a busca
|
||||
try:
|
||||
response = await es.search(index="diario_oficial", body=es_body)
|
||||
except Exception as e:
|
||||
print(f"Erro ao executar busca no Elasticsearch: {e}")
|
||||
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
|
||||
finally:
|
||||
await es.close()
|
||||
|
||||
# Processar e retornar os resultados
|
||||
return await processar_resultados(response, query, page, page_size)
|
||||
|
||||
|
||||
async def list_diarios(
|
||||
es_client: AsyncElasticsearch,
|
||||
page: int = 1,
|
||||
page_size: int = 10,
|
||||
data_inicio: Optional[str] = None,
|
||||
data_fim: Optional[str] = None,
|
||||
tipo_diario: Optional[str] = None,
|
||||
numero_diario: Optional[str] = None,
|
||||
ordenar_por: str = "data_desc"
|
||||
) -> Dict[str, any]:
|
||||
"""
|
||||
Lista diários com paginação e filtros opcionais
|
||||
|
||||
Args:
|
||||
es_client: Cliente do Elasticsearch
|
||||
page: Número da página
|
||||
page_size: Itens por página
|
||||
data_inicio: Data inicial (YYYY-MM-DD)
|
||||
data_fim: Data final (YYYY-MM-DD)
|
||||
tipo_diario: Filtro por tipo de diário
|
||||
numero_diario: Filtro por número do diário
|
||||
ordenar_por: Campo para ordenação (data_desc, data_asc)
|
||||
|
||||
Returns:
|
||||
Dict com total de itens e lista de diários
|
||||
"""
|
||||
# Construir query de filtros
|
||||
filters = []
|
||||
|
||||
# Filtro por data
|
||||
if data_inicio or data_fim:
|
||||
date_range = {}
|
||||
if data_inicio:
|
||||
try:
|
||||
datetime.strptime(data_inicio, "%Y-%m-%d")
|
||||
date_range["gte"] = data_inicio
|
||||
except ValueError:
|
||||
pass
|
||||
if data_fim:
|
||||
try:
|
||||
datetime.strptime(data_fim, "%Y-%m-%d")
|
||||
date_range["lte"] = data_fim
|
||||
except ValueError:
|
||||
pass
|
||||
if date_range:
|
||||
filters.append({"range": {"data": date_range}})
|
||||
|
||||
# Filtro por tipo
|
||||
if tipo_diario:
|
||||
filters.append({"term": {"tipo.nome": tipo_diario}})
|
||||
|
||||
# Filtro por número
|
||||
if numero_diario:
|
||||
filters.append({"wildcard": {"numero": f"*{numero_diario}*"}})
|
||||
|
||||
# Construir query completa
|
||||
query = {
|
||||
"bool": {
|
||||
"must": [{"match_all": {}}],
|
||||
"filter": filters
|
||||
}
|
||||
}
|
||||
|
||||
# Definir ordenação
|
||||
sort = [{"data": {"order": "asc" if ordenar_por == "data_asc" else "desc"}}]
|
||||
|
||||
# Executar consulta
|
||||
try:
|
||||
response = await es_client.search(
|
||||
index="diario_oficial",
|
||||
body={
|
||||
"query": query,
|
||||
"sort": sort,
|
||||
"from": (page - 1) * page_size,
|
||||
"size": page_size,
|
||||
"_source": ["numero", "data", "tipo", "link"]
|
||||
}
|
||||
)
|
||||
|
||||
hits = response["hits"]["hits"]
|
||||
total = response["hits"]["total"]["value"]
|
||||
|
||||
# Processar resultados
|
||||
diarios = []
|
||||
for hit in hits:
|
||||
source = hit["_source"]
|
||||
diarios.append(BuscaDiariosResponseSchema(
|
||||
id=hit["_id"],
|
||||
numero=source.get("numero"),
|
||||
data=source.get("data"),
|
||||
tipo=source.get("tipo", {}).get("nome"),
|
||||
link=source.get("link")
|
||||
))
|
||||
|
||||
return {
|
||||
"total": total,
|
||||
"page": page,
|
||||
"page_size": page_size,
|
||||
"results": diarios
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"Erro ao listar diários: {e}")
|
||||
return {
|
||||
"total": 0,
|
||||
"page": page,
|
||||
"page_size": page_size,
|
||||
"results": []
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user