Updated modules and DB functions - service needs DB update function added
This commit is contained in:
@ -1,9 +1,17 @@
|
||||
import sqlite3 as sql
|
||||
import time
|
||||
|
||||
tbl_artist_info_ddl = str('CREATE TABLE artist_info ('
|
||||
'artist_id TEXT NOT NULL PRIMARY KEY,'
|
||||
'artist_info TEXT,'
|
||||
'last_update TEXT);')
|
||||
'artist_id TEXT NOT NULL PRIMARY KEY,'
|
||||
'artist_name TEXT,'
|
||||
'artist_info TEXT,'
|
||||
'mb_artist_id TEXT,'
|
||||
'image_url TEXT,'
|
||||
'wikidata_url TEXT,'
|
||||
'wikipedia_url TEXT,'
|
||||
'wikipedia_image TEXT,'
|
||||
'wikipedia_extract TEXT,'
|
||||
'last_update TEXT);')
|
||||
|
||||
class SQLiteDatabase(object):
|
||||
def __init__(self, db_filename):
|
||||
@ -46,6 +54,7 @@ class SQLiteDatabase(object):
|
||||
cursor.execute(query, params)
|
||||
else:
|
||||
cursor.execute(query)
|
||||
print("%s rows affected"%cursor.rowcount)
|
||||
return cursor
|
||||
except sql.Error as e:
|
||||
print("SQLite error %s"%e.args[0])
|
||||
@ -55,11 +64,36 @@ class SQLiteDatabase(object):
|
||||
print("Error params %s"%str(params))
|
||||
print("Error params type %s"%type(params))
|
||||
|
||||
def update_artist(self, artist_id, artist_info, update_time):
|
||||
success = False
|
||||
query = 'INSERT or REPLACE INTO artist_info VALUES (?, ?, ?)'
|
||||
params = (str(artist_id), str(artist_info), str(update_time))
|
||||
def get_record_age(self, artist_id):
|
||||
try:
|
||||
last_update = self.get_value(artist_id, 'last_update')
|
||||
record_age = round(time.time())-round(float(last_update[0][0]))
|
||||
return record_age
|
||||
except Exception as e:
|
||||
print("get_record_age failed %s" % e)
|
||||
return
|
||||
|
||||
def get_artist_info(self, artist_id):
|
||||
query = 'SELECT * FROM artist_info WHERE artist_id = ?'# %str(artist_id)
|
||||
params = [str(artist_id)]
|
||||
cursor = self.run_query(query, params)
|
||||
return cursor.fetchall()
|
||||
|
||||
def get_all(self):
|
||||
query = 'SELECT * FROM artist_info'
|
||||
#params = [str(artist_id)]
|
||||
cursor = self.run_query(query)#, params)
|
||||
return cursor.fetchall()
|
||||
|
||||
def update_value(self, artist_id, field_name, field_value):
|
||||
success = False
|
||||
query = 'UPDATE artist_info SET %s = ?, last_update = ? WHERE artist_id = ?' % str(field_name)
|
||||
params = [str(field_value), str(time.time()), str(artist_id)]
|
||||
cursor = self.run_query(query, params)
|
||||
if (cursor.rowcount == 0):
|
||||
query = 'INSERT OR IGNORE INTO artist_info (artist_id, %s, last_update) VALUES (?, ?, ?)' % str(field_name)
|
||||
params = [str(artist_id), str(field_value), str(time.time())]
|
||||
cursor = self.run_query(query, params)
|
||||
try:
|
||||
self.conn.commit()
|
||||
success = True
|
||||
@ -68,8 +102,8 @@ class SQLiteDatabase(object):
|
||||
pass
|
||||
return success
|
||||
|
||||
def get_artist_info(self, artist_id):
|
||||
query = 'SELECT * FROM artist_info WHERE artist_id = ?'
|
||||
def get_value(self, artist_id, field_name):
|
||||
query = 'SELECT %s FROM artist_info WHERE artist_id = ?' % str(field_name)
|
||||
params = [str(artist_id)]
|
||||
cursor = self.run_query(query, params)
|
||||
return cursor.fetchall()
|
||||
|
||||
6
lib/musicbrainz/__init__.py
Normal file
6
lib/musicbrainz/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
"""
|
||||
Musicbrainz utilities for plugin.audio.subsonic
|
||||
"""
|
||||
from .mbconnection import *
|
||||
|
||||
__version__ = '0.0.1'
|
||||
146
lib/musicbrainz/mbconnection.py
Normal file
146
lib/musicbrainz/mbconnection.py
Normal file
@ -0,0 +1,146 @@
|
||||
import os
|
||||
import json
|
||||
import urllib.request
|
||||
from urllib.parse import urlencode
|
||||
import xml.etree.ElementTree as ET
|
||||
import re
|
||||
|
||||
urllib.request.install_opener(urllib.request.build_opener(urllib.request.HTTPSHandler))
|
||||
|
||||
class MBConnection(object):
|
||||
def __init__(self, lang = "en"):
|
||||
self._lang = lang
|
||||
self._baseUrl = "https://musicbrainz.org/ws/2"
|
||||
self._wikipediaBaseUrl = "https://{}.wikipedia.org/wiki".format(self._lang)
|
||||
self._wikidataBaseUrl = "https://www.wikidata.org/wiki/Special:EntityData"
|
||||
self._opener = self._getOpener()
|
||||
|
||||
def _getOpener(self):
|
||||
opener = urllib.request.build_opener(urllib.request.HTTPSHandler)
|
||||
return opener
|
||||
|
||||
def search(self, entity, query, limit=None, offset=None):
|
||||
viewName = '%s' % entity
|
||||
q = self._getQueryDict({'query': query, 'limit': limit, 'offset': offset})
|
||||
req = self._getRequest(self._baseUrl, viewName, q)
|
||||
res = self._doInfoReqXml(req)
|
||||
return res
|
||||
|
||||
def get_wiki_extract(self, title):
|
||||
try:
|
||||
if('http' in title):
|
||||
#accepts search text or full url https://en.wikipedia.org/wiki/Alex_Lloyd
|
||||
pattern = 'wikipedia.org/wiki/(.+)'
|
||||
title = re.search(pattern, title).group(1)
|
||||
viewName = 'api.php'
|
||||
q = self._getQueryDict({'format' : 'json', 'action' : 'query', 'prop' : 'extracts', 'redirects' : 1, 'titles' : title})
|
||||
req = self._getRequest(self._wikipediaBaseUrl[:-3], viewName, q, 'exintro&explaintext&')
|
||||
res = self._doInfoReqJson(req)
|
||||
pages = res['query']['pages']
|
||||
extract = list(pages.values())[0]['extract']
|
||||
return extract
|
||||
except Exception as e:
|
||||
print("get_artist_wikpedia failed %s"%e)
|
||||
return
|
||||
|
||||
#https://en.wikipedia.org/w/api.php?exintro&explaintext&format=json&action=query&prop=extracts&redirects=1&titles=%C3%89milie_Simon
|
||||
def get_wiki_image(self, title):
|
||||
try:
|
||||
if('http' in title):
|
||||
#accepts search text or full url https://en.wikipedia.org/wiki/Alex_Lloyd
|
||||
pattern = 'wikipedia.org/wiki/(.+)'
|
||||
title = re.search(pattern, title).group(1)
|
||||
viewName = 'api.php'
|
||||
q = self._getQueryDict({'format' : 'json', 'action' : 'query', 'prop' : 'pageimages', 'pithumbsize' : 800, 'titles' : title})
|
||||
req = self._getRequest(self._wikipediaBaseUrl[:-3], viewName, q)
|
||||
res = self._doInfoReqJson(req)
|
||||
pages = res['query']['pages']
|
||||
print(res['query']['pages'])
|
||||
image_url = list(pages.values())[0]['thumbnail']['source']
|
||||
return image_url
|
||||
except Exception as e:
|
||||
print("get_wiki_image failed %s"%e)
|
||||
return
|
||||
|
||||
def get_artist_id(self, query):
|
||||
try:
|
||||
dres = self.search('artist', query)
|
||||
artist_list = dres.find('{http://musicbrainz.org/ns/mmd-2.0#}artist-list')
|
||||
artist = artist_list.find('{http://musicbrainz.org/ns/mmd-2.0#}artist')
|
||||
return artist.attrib['id']
|
||||
except Exception as e:
|
||||
print("get_artist_id failed %s"%e)
|
||||
return
|
||||
|
||||
def get_relation(self, artist_id, rel_type):
|
||||
try:
|
||||
viewName = 'artist/%s' % artist_id
|
||||
q = self._getQueryDict({'inc': "url-rels"})
|
||||
req = self._getRequest(self._baseUrl, viewName, q)
|
||||
res = self._doInfoReqXml(req)
|
||||
for relation in res.iter('{http://musicbrainz.org/ns/mmd-2.0#}relation'):
|
||||
if relation.attrib['type'] == rel_type:
|
||||
return relation.find('{http://musicbrainz.org/ns/mmd-2.0#}target').text
|
||||
except Exception as e:
|
||||
print("get_artist_image failed %s"%e)
|
||||
return
|
||||
|
||||
def get_artist_image(self, artist_id):
|
||||
try:
|
||||
image = self.get_relation(artist_id, 'image')
|
||||
return image
|
||||
except Exception as e:
|
||||
print("get_artist_image failed %s"%e)
|
||||
return
|
||||
|
||||
def get_artist_wikpedia(self, artist_id):
|
||||
wikidata_url = self.get_relation(artist_id, 'wikidata')
|
||||
pattern = 'www.wikidata.org/wiki/(Q\d+)'
|
||||
try:
|
||||
wikidata_ref = re.search(pattern, wikidata_url).group(1)
|
||||
viewName = '%s.rdf' % wikidata_ref
|
||||
q = self._getQueryDict({})
|
||||
req = self._getRequest(self._wikidataBaseUrl, viewName, q )
|
||||
res = self._doInfoReqXml(req)
|
||||
for item in res.iter('{http://www.w3.org/1999/02/22-rdf-syntax-ns#}Description'):
|
||||
try:
|
||||
url = item.attrib['{http://www.w3.org/1999/02/22-rdf-syntax-ns#}about']
|
||||
if self._wikipediaBaseUrl in url:
|
||||
#print(urlencode(url))
|
||||
#print((url.encode().decode('unicode-escape')))
|
||||
return urllib.parse.unquote(url)
|
||||
except KeyError:
|
||||
pass
|
||||
except Exception as e:
|
||||
print("get_artist_wikpedia failed %s"%e)
|
||||
return
|
||||
|
||||
def _getQueryDict(self, d):
|
||||
"""
|
||||
Given a dictionary, it cleans out all the values set to None
|
||||
"""
|
||||
for k, v in list(d.items()):
|
||||
if v is None:
|
||||
del d[k]
|
||||
return d
|
||||
|
||||
def _getRequest(self, baseUrl, viewName, query={}, prefix=""):
|
||||
qdict = {}
|
||||
qdict.update(query)
|
||||
url = '%s/%s' % (baseUrl, viewName)
|
||||
if(prefix!='' or qdict!={}):
|
||||
url += "?%s%s" % (prefix, urlencode(qdict))
|
||||
print("UseGET URL %s" % (url))
|
||||
req = urllib.request.Request(url)
|
||||
return req
|
||||
|
||||
def _doInfoReqXml(self, req):
|
||||
res = urllib.request.urlopen(req)
|
||||
data = res.read().decode('utf-8')
|
||||
dres = ET.fromstring(data)
|
||||
return dres
|
||||
|
||||
def _doInfoReqJson(self, req):
|
||||
res = urllib.request.urlopen(req)
|
||||
dres = json.loads(res.read().decode('utf-8'))
|
||||
return dres
|
||||
Reference in New Issue
Block a user